From 86f745eb9d8281f1201580c9f2dfd9489c8dc725 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 14 Aug 2024 17:04:13 +0200 Subject: [PATCH] Fix complexity --- relay/testec2/relay.go | 192 ++++++++++++++++++++++------------------- 1 file changed, 103 insertions(+), 89 deletions(-) diff --git a/relay/testec2/relay.go b/relay/testec2/relay.go index 9071f3dcd..08b15d3a6 100644 --- a/relay/testec2/relay.go +++ b/relay/testec2/relay.go @@ -18,8 +18,54 @@ var ( ) func relayTransfer(serverConnURL string, testData []byte, peerPairs int) { - ctx := context.Background() + connsSender := prepareConnsSender(serverConnURL, peerPairs) + defer func() { + for i := 0; i < len(connsSender); i++ { + err := connsSender[i].Close() + if err != nil { + log.Errorf("failed to close connection: %s", err) + } + } + }() + wg := sync.WaitGroup{} + wg.Add(len(connsSender)) + for _, conn := range connsSender { + go func(conn net.Conn) { + defer wg.Done() + runWriter(conn, testData) + }(conn) + } + wg.Wait() +} + +func runWriter(conn net.Conn, testData []byte) { + si := NewStartInidication(time.Now(), len(testData)) + _, err := conn.Write(si) + if err != nil { + log.Errorf("failed to write to channel: %s", err) + return + } + log.Infof("sent start indication") + + pieceSize := 1024 + testDataLen := len(testData) + + for j := 0; j < testDataLen; j += pieceSize { + end := j + pieceSize + if end > testDataLen { + end = testDataLen + } + _, writeErr := conn.Write(testData[j:end]) + if writeErr != nil { + log.Errorf("failed to write to channel: %s", writeErr) + return + } + } +} + +func prepareConnsSender(serverConnURL string, peerPairs int) []net.Conn { + ctx := context.Background() clientsSender := make([]*client.Client, peerPairs) for i := 0; i < cap(clientsSender); i++ { c := client.NewClient(ctx, serverConnURL, hmacTokenStore, "sender-"+fmt.Sprint(i)) @@ -37,51 +83,75 @@ func relayTransfer(serverConnURL string, testData []byte, peerPairs int) { } connsSender = append(connsSender, conn) } + return connsSender +} +func relayReceive(serverConnURL string, peerPairs int) []time.Duration { + connsReceiver := prepareConnsReceiver(serverConnURL, peerPairs) defer func() { - for i := 0; i < len(connsSender); i++ { - err := connsSender[i].Close() - if err != nil { + for i := 0; i < len(connsReceiver); i++ { + if err := connsReceiver[i].Close(); err != nil { log.Errorf("failed to close connection: %s", err) } } }() + durations := make(chan time.Duration, len(connsReceiver)) wg := sync.WaitGroup{} - var writeErr error - for i, conn := range connsSender { + for _, conn := range connsReceiver { wg.Add(1) - go func(i int, conn net.Conn) { + go func(conn net.Conn) { defer wg.Done() - - si := NewStartInidication(time.Now(), len(testData)) - _, err := conn.Write(si) - if err != nil { - log.Errorf("failed to write to channel: %s", err) - return - } - log.Infof("sent start indication") - - pieceSize := 1024 - testDataLen := len(testData) - - for j := 0; j < testDataLen; j += pieceSize { - end := j + pieceSize - if end > testDataLen { - end = testDataLen - } - _, writeErr = conn.Write(testData[j:end]) - if writeErr != nil { - log.Errorf("failed to write to channel: %s", writeErr) - return - } - } - }(i, conn) + duration := runReader(conn) + durations <- duration + }(conn) } wg.Wait() + + durationsList := make([]time.Duration, 0, len(connsReceiver)) + for d := range durations { + durationsList = append(durationsList, d) + if len(durationsList) == len(connsReceiver) { + close(durations) + } + } + + return durationsList } -func relayReceive(serverConnURL string, peerPairs int) []time.Duration { +func runReader(conn net.Conn) time.Duration { + buf := make([]byte, 8192) + + n, readErr := conn.Read(buf) + if readErr != nil { + log.Errorf("failed to read from channel: %s", readErr) + return 0 + } + + si := DecodeStartIndication(buf[:n]) + log.Infof("received start indication: %v", si) + + receivedSize, err := conn.Read(buf) + if err != nil { + log.Fatalf("failed to read from relay: %s", err) + } + now := time.Now() + + rcv := 0 + for receivedSize < si.TransferSize { + n, readErr = conn.Read(buf) + if readErr != nil { + log.Errorf("failed to read from channel: %s", readErr) + return 0 + } + + receivedSize += n + rcv += n + } + return time.Since(now) +} + +func prepareConnsReceiver(serverConnURL string, peerPairs int) []net.Conn { clientsReceiver := make([]*client.Client, peerPairs) for i := 0; i < cap(clientsReceiver); i++ { c := client.NewClient(context.Background(), serverConnURL, hmacTokenStore, "receiver-"+fmt.Sprint(i)) @@ -100,61 +170,5 @@ func relayReceive(serverConnURL string, peerPairs int) []time.Duration { } connsReceiver = append(connsReceiver, conn) } - - defer func() { - for i := 0; i < len(connsReceiver); i++ { - if err := connsReceiver[i].Close(); err != nil { - log.Errorf("failed to close connection: %s", err) - } - } - }() - - durations := make(chan time.Duration, len(connsReceiver)) - wg := sync.WaitGroup{} - for i, conn := range connsReceiver { - wg.Add(1) - go func(i int, conn net.Conn) { - defer wg.Done() - buf := make([]byte, 8192) - - n, readErr := conn.Read(buf) - if readErr != nil { - log.Errorf("failed to read from channel: %s", readErr) - return - } - - si := DecodeStartIndication(buf[:n]) - log.Infof("received start indication: %v", si) - - receivedSize, err := conn.Read(buf) - if err != nil { - log.Fatalf("failed to read from relay: %s", err) - } - now := time.Now() - - rcv := 0 - for receivedSize < si.TransferSize { - n, readErr = conn.Read(buf) - if readErr != nil { - log.Errorf("failed to read from channel: %s", readErr) - return - } - - receivedSize += n - rcv += n - } - durations <- time.Since(now) - }(i, conn) - } - wg.Wait() - - durationsList := make([]time.Duration, 0, len(connsReceiver)) - for d := range durations { - durationsList = append(durationsList, d) - if len(durationsList) == len(connsReceiver) { - close(durations) - } - } - - return durationsList + return connsReceiver }