From b31f25ec70f107be6d36316953209b25d72ebba1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Mon, 12 Aug 2024 15:32:01 +0200 Subject: [PATCH] Add test code --- relay/teste2c/main.go | 221 +++++++++++++++++++++++++++++++++++++ relay/teste2c/relay.go | 146 ++++++++++++++++++++++++ relay/teste2c/signal.go | 91 +++++++++++++++ relay/teste2c/start_msg.go | 37 +++++++ relay/teste2c/turn.go | 156 ++++++++++++++++++++++++++ 5 files changed, 651 insertions(+) create mode 100644 relay/teste2c/main.go create mode 100644 relay/teste2c/relay.go create mode 100644 relay/teste2c/signal.go create mode 100644 relay/teste2c/start_msg.go create mode 100644 relay/teste2c/turn.go diff --git a/relay/teste2c/main.go b/relay/teste2c/main.go new file mode 100644 index 000000000..94e3fb032 --- /dev/null +++ b/relay/teste2c/main.go @@ -0,0 +1,221 @@ +package main + +import ( + "crypto/rand" + "flag" + "net" + "sync" + "time" + + log "github.com/sirupsen/logrus" +) + +var ( + dataSize = 1024 * 1024 * 10 // 5MB + pairs = []int{1, 3, 5, 10, 50, 100} + relaySrvAddress = "rel://relay-eu1.stage.npeer.io:80" + turnSrvAddress = "relay-eu1.stage.npeer.io:3478" + signalAddress = "http://172.20.8.77:8081" // ip address of the receiver instance + signalListenAddress = ":8081" +) + +type testResult struct { + numOfPairs int + duration time.Duration + speed float64 +} + +func seedRandomData(size int) ([]byte, error) { + token := make([]byte, size) + _, err := rand.Read(token) + if err != nil { + return nil, err + } + return token, nil +} + +func avg(transferDuration []time.Duration) (time.Duration, float64) { + var totalDuration time.Duration + for _, d := range transferDuration { + totalDuration += d + } + avgDuration := totalDuration / time.Duration(len(transferDuration)) + mbps := float64(dataSize) / avgDuration.Seconds() / 1024 / 1024 + return avgDuration, mbps +} + +func RelayReceiverMain() []testResult { + testResults := make([]testResult, 0, len(pairs)) + for _, p := range pairs { + tr := testResult{numOfPairs: p} + td := relayReceive(relaySrvAddress, p) + tr.duration, tr.speed = avg(td) + + testResults = append(testResults, tr) + } + + return testResults +} + +func RelaySenderMain() { + log.Infof("starting sender") + log.Infof("starting seed phase") + + testData, err := seedRandomData(dataSize) + if err != nil { + log.Fatalf("failed to seed random data: %s", err) + } + + log.Infof("data size: %d", len(testData)) + + for n, p := range pairs { + log.Infof("running test with %d pairs", p) + relayTransfer(relaySrvAddress, testData, p) + + // give time to prepare new receivers + if n < len(pairs)-1 { + time.Sleep(3 * time.Second) + } + } + +} + +// TRUNServerMain is the sender +// - allocate turn clients +// - send relayed addresses to signal server in batch +// - wait for signal server to send back addresses in a map +// - send test data to each address in parallel +func TRUNServerMain() { + log.Infof("starting turn test") + + log.Infof("starting seed random data: %d", dataSize) + testData, err := seedRandomData(dataSize) + if err != nil { + log.Fatalf("failed to seed random data: %s", err) + } + + ss := SignalClient{signalAddress} + + for _, p := range pairs { + log.Infof("running test with %d pairs", p) + turnConns := make(map[string]*TurnConn) + addresses := make([]string, 0, len(pairs)) + for i := 0; i < p; i++ { + tc := AllocateTurnClient(turnSrvAddress) + log.Infof("allocated turn client: %s", tc.Address().String()) + turnConns[tc.Address().String()] = tc + addresses = append(addresses, tc.Address().String()) + } + + log.Infof("send addresses via signal server: %v", addresses) + clientAddresses, err := ss.SendAddress(addresses) + if err != nil { + log.Fatalf("failed to send address: %s", err) + } + + wg := sync.WaitGroup{} + wg.Add(len(clientAddresses.Address)) + for k, v := range clientAddresses.Address { + go func(k, v string) { + log.Infof("sending test data to: %s", v) + defer wg.Done() + tc, ok := turnConns[k] + if !ok { + log.Fatalf("failed to find turn conn: %s", k) + } + addr, err := net.ResolveUDPAddr("udp", v) + if err != nil { + log.Fatalf("failed to resolve udp address: %s", err) + } + tc.WriteTestData(testData, addr) + }(k, v) + } + wg.Wait() + } +} + +func TURNClientMain() []testResult { + log.Infof("starting turn client test") + si := NewSignalService() + go func() { + log.Infof("starting signal server") + err := si.Listen(signalListenAddress) + if err != nil { + log.Errorf("failed to listen: %s", err) + } + }() + + testResults := make([]testResult, 0, len(pairs)) + for _ = range pairs { + log.Infof("waiting for addresses") + addresses := <-si.AddressesChan + log.Infof("received addresses: %d", len(addresses)) + + conns := make([]*UDPConn, 0, len(addresses)) + clientAddresses := make(map[string]string, len(addresses)) + for _, addr := range addresses { + conn, err := Dial(addr) + if err != nil { + log.Fatalf("failed to dial: %s", err) + } + log.Infof("made client UDP conn: %s", conn.LocalAddr()) + conns = append(conns, conn) + clientAddresses[addr] = conn.LocalAddr().String() + } + + // send back local addresses + log.Infof("response addresses back: %v", clientAddresses) + si.ClientAddressChan <- clientAddresses + + durations := make(chan time.Duration, len(conns)) + for _, c := range conns { + go func(c *UDPConn) { + log.Infof("start to read test data from: %s", c.RemoteAddr()) + duration := c.ReadTestData(c) + durations <- duration + _ = c.Close() + }(c) + } + + durationsList := make([]time.Duration, 0, len(conns)) + for d := range durations { + durationsList = append(durationsList, d) + if len(durationsList) == len(conns) { + close(durations) + } + } + + avgDuration, avgSpeed := avg(durationsList) + ts := testResult{ + numOfPairs: len(conns), + duration: avgDuration, + speed: avgSpeed, + } + testResults = append(testResults, ts) + } + + return testResults +} + +func main() { + log.SetLevel(log.DebugLevel) + var mode string + flag.StringVar(&mode, "mode", "sender", "sender or receiver mode") + flag.Parse() + + if mode == "receiver" { + relayResult := RelayReceiverMain() + time.Sleep(3 * time.Second) + turnResults := TURNClientMain() + + for i := 0; i < len(turnResults); i++ { + log.Infof("pairs: %d, relay duration: %s, relay speed: %.2f MB/s", relayResult[i].numOfPairs, relayResult[i].duration, relayResult[i].speed) + log.Infof("pairs: %d, turn duration: %s, turn speed: %.2f MB/s", turnResults[i].numOfPairs, turnResults[i].duration, turnResults[i].speed) + } + } else { + RelaySenderMain() + // grant time for receiver to start + time.Sleep(6 * time.Second) + TRUNServerMain() + } +} diff --git a/relay/teste2c/relay.go b/relay/teste2c/relay.go new file mode 100644 index 000000000..0e370dff8 --- /dev/null +++ b/relay/teste2c/relay.go @@ -0,0 +1,146 @@ +package main + +import ( + "context" + "fmt" + "net" + "sync" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/relay/auth/hmac" + "github.com/netbirdio/netbird/relay/client" +) + +var ( + hmacTokenStore = &hmac.TokenStore{} +) + +func relayTransfer(serverConnURL string, testData []byte, peerPairs int) { + ctx := context.Background() + + clientsSender := make([]*client.Client, peerPairs) + for i := 0; i < cap(clientsSender); i++ { + c := client.NewClient(ctx, serverConnURL, hmacTokenStore, "sender-"+fmt.Sprint(i)) + if err := c.Connect(); err != nil { + log.Fatalf("failed to connect to server: %s", err) + } + clientsSender[i] = c + } + + connsSender := make([]net.Conn, 0, peerPairs) + for i := 0; i < len(clientsSender); i++ { + conn, err := clientsSender[i].OpenConn("receiver-" + fmt.Sprint(i)) + if err != nil { + log.Fatalf("failed to bind channel: %s", err) + } + connsSender = append(connsSender, conn) + } + + defer func() { + for i := 0; i < len(connsSender); i++ { + err := connsSender[i].Close() + if err != nil { + log.Errorf("failed to close connection: %s", err) + } + } + }() + + wg := sync.WaitGroup{} + var writeErr error + for i := 0; i < len(connsSender); i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + si := NewStartInidication(time.Now(), len(testData)) + _, err := connsSender[i].Write(si) + if err != nil { + log.Errorf("failed to write to channel: %s", err) + return + } + log.Infof("sent start indication") + + pieceSize := 1024 + testDataLen := len(testData) + + for j := 0; j < testDataLen; j += pieceSize { + end := j + pieceSize + if end > testDataLen { + end = testDataLen + } + _, writeErr = connsSender[i].Write(testData[j:end]) + if writeErr != nil { + log.Errorf("failed to write to channel: %s", writeErr) + return + } + } + }(i) + } + wg.Wait() +} + +func relayReceive(serverConnURL string, peerPairs int) []time.Duration { + clientsReceiver := make([]*client.Client, peerPairs) + for i := 0; i < cap(clientsReceiver); i++ { + c := client.NewClient(context.Background(), serverConnURL, hmacTokenStore, "receiver-"+fmt.Sprint(i)) + err := c.Connect() + if err != nil { + log.Fatalf("failed to connect to server: %s", err) + } + clientsReceiver[i] = c + } + + connsReceiver := make([]net.Conn, 0, peerPairs) + for i := 0; i < len(clientsReceiver); i++ { + conn, err := clientsReceiver[i].OpenConn("sender-" + fmt.Sprint(i)) + if err != nil { + log.Fatalf("failed to bind channel: %s", err) + } + connsReceiver = append(connsReceiver, conn) + } + + defer func() { + for i := 0; i < len(connsReceiver); i++ { + if err := connsReceiver[i].Close(); err != nil { + log.Errorf("failed to close connection: %s", err) + } + } + }() + + var transferDuration []time.Duration + wg := sync.WaitGroup{} + for i := 0; i < peerPairs; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + buf := make([]byte, 8192) + + n, readErr := connsReceiver[i].Read(buf) + if readErr != nil { + log.Errorf("failed to read from channel: %s", readErr) + return + } + + si := DecodeStartIndication(buf[:n]) + log.Infof("received start indication: %v", si) + + rcv := 0 + for receivedSize := 0; receivedSize < si.TransferSize; { + n, readErr = connsReceiver[i].Read(buf) + if readErr != nil { + log.Errorf("failed to read from channel: %s", readErr) + return + } + + receivedSize += n + rcv += n + } + transferDuration = append(transferDuration, time.Since(si.Started)) + }(i) + } + + wg.Wait() + return transferDuration +} diff --git a/relay/teste2c/signal.go b/relay/teste2c/signal.go new file mode 100644 index 000000000..4867d6dd0 --- /dev/null +++ b/relay/teste2c/signal.go @@ -0,0 +1,91 @@ +package main + +import ( + "bytes" + "encoding/json" + "net/http" + + log "github.com/sirupsen/logrus" +) + +type PeerAddr struct { + Address []string +} + +type ClientPeerAddr struct { + Address map[string]string +} + +type Signal struct { + AddressesChan chan []string + ClientAddressChan chan map[string]string +} + +func NewSignalService() *Signal { + return &Signal{ + AddressesChan: make(chan []string, 0), + ClientAddressChan: make(chan map[string]string, 0), + } +} + +func (rs *Signal) Listen(listenAdddr string) error { + http.HandleFunc("/", rs.onNewAddresses) + return http.ListenAndServe(listenAdddr, nil) +} + +func (rs *Signal) onNewAddresses(w http.ResponseWriter, r *http.Request) { + var msg PeerAddr + err := json.NewDecoder(r.Body).Decode(&msg) + if err != nil { + log.Errorf("Error decoding message: %v", err) + } + + log.Infof("received addresses: %v", msg.Address) + rs.AddressesChan <- msg.Address + clientAddresses := <-rs.ClientAddressChan + log.Infof("Sending back addresses: %v", clientAddresses) + + respMsg := ClientPeerAddr{ + Address: clientAddresses, + } + data, err := json.Marshal(respMsg) + if err != nil { + log.Errorf("Error marshalling message: %v", err) + return + } + + _, err = w.Write(data) + if err != nil { + log.Errorf("Error writing response: %v", err) + } +} + +// "http://localhost:8080/address" +type SignalClient struct { + SignalAddress string +} + +func (ss SignalClient) SendAddress(addresses []string) (*ClientPeerAddr, error) { + msg := PeerAddr{ + Address: addresses, + } + data, err := json.Marshal(msg) + if err != nil { + return nil, err + } + + response, err := http.Post(ss.SignalAddress, "application/json", bytes.NewBuffer(data)) + if err != nil { + return nil, err + } + + defer response.Body.Close() + + log.Debugf("wait for signal response") + var respPeerAddress ClientPeerAddr + err = json.NewDecoder(response.Body).Decode(&respPeerAddress) + if err != nil { + return nil, err + } + return &respPeerAddress, nil +} diff --git a/relay/teste2c/start_msg.go b/relay/teste2c/start_msg.go new file mode 100644 index 000000000..156ee52d3 --- /dev/null +++ b/relay/teste2c/start_msg.go @@ -0,0 +1,37 @@ +package main + +import ( + "bytes" + "encoding/gob" + "time" + + log "github.com/sirupsen/logrus" +) + +type StartIndication struct { + Started time.Time + TransferSize int +} + +func NewStartInidication(started time.Time, transferSize int) []byte { + si := StartIndication{ + Started: started, + TransferSize: transferSize, + } + + var data bytes.Buffer + err := gob.NewEncoder(&data).Encode(si) + if err != nil { + log.Fatal("encode error:", err) + } + return data.Bytes() +} + +func DecodeStartIndication(data []byte) StartIndication { + var si StartIndication + err := gob.NewDecoder(bytes.NewReader(data)).Decode(&si) + if err != nil { + log.Fatal("decode error:", err) + } + return si +} diff --git a/relay/teste2c/turn.go b/relay/teste2c/turn.go new file mode 100644 index 000000000..355511d60 --- /dev/null +++ b/relay/teste2c/turn.go @@ -0,0 +1,156 @@ +package main + +import ( + "fmt" + "net" + "time" + + "github.com/pion/logging" + "github.com/pion/turn/v3" + log "github.com/sirupsen/logrus" +) + +type TurnConn struct { + conn net.Conn + turnClient *turn.Client + relayConn net.PacketConn +} + +func (tc *TurnConn) Address() net.Addr { + return tc.relayConn.LocalAddr() +} + +func (tc *TurnConn) Close() { + _ = tc.relayConn.Close() + tc.turnClient.Close() + _ = tc.conn.Close() +} + +func AllocateTurnClient(serverAddr string) *TurnConn { + conn, err := net.Dial("tcp", serverAddr) + if err != nil { + log.Fatal(err) + } + + turnClient, err := getTurnClient(serverAddr, conn) + if err != nil { + log.Fatal(err) + } + + relayConn, err := turnClient.Allocate() + if err != nil { + log.Fatal(err) + } + + return &TurnConn{ + conn: conn, + turnClient: turnClient, + relayConn: relayConn, + } +} + +func (tc *TurnConn) WriteTestData(testData []byte, dstAddr net.Addr) { + log.Infof("write test data to: %s", dstAddr) + testDataSize := len(testData) + si := NewStartInidication(time.Now(), testDataSize) + _, err := tc.relayConn.WriteTo(si, dstAddr) + if err != nil { + log.Errorf("failed to write to: %s, %s", dstAddr, err) + return + } + + pieceSize := 1024 + ackBuff := make([]byte, 1) + pipelineSize := 10 + for j := 0; j < testDataSize; j += pieceSize { + end := j + pieceSize + if end > testDataSize { + end = testDataSize + } + _, err := tc.relayConn.WriteTo(testData[j:end], dstAddr) + if err != nil { + log.Fatalf("failed to write to channel: %s", err) + } + if pipelineSize == 0 { + _, _, _ = tc.relayConn.ReadFrom(ackBuff) + } else { + pipelineSize-- + } + } +} + +func getTurnClient(address string, conn net.Conn) (*turn.Client, error) { + // Dial TURN Server + addrStr := fmt.Sprintf("%s:%d", address, 443) + + fac := logging.NewDefaultLoggerFactory() + //fac.DefaultLogLevel = logging.LogLevelTrace + + // Start a new TURN Client and wrap our net.Conn in a STUNConn + // This allows us to simulate datagram based communication over a net.Conn + cfg := &turn.ClientConfig{ + TURNServerAddr: address, + Conn: turn.NewSTUNConn(conn), + Username: "test", + Password: "test", + LoggerFactory: fac, + } + + client, err := turn.NewClient(cfg) + if err != nil { + return nil, fmt.Errorf("failed to create TURN client for server %s: %s", addrStr, err) + } + + // Start listening on the conn provided. + err = client.Listen() + if err != nil { + client.Close() + return nil, fmt.Errorf("failed to listen on TURN client for server %s: %s", addrStr, err) + } + + return client, nil +} + +type UDPConn struct { + net.Conn +} + +func Dial(addr string) (*UDPConn, error) { + conn, err := net.Dial("udp", addr) + if err != nil { + return nil, err + } + return &UDPConn{conn}, nil +} + +func (c UDPConn) ReadTestData(c2 *UDPConn) time.Duration { + log.Infof("reading test data from TURN relay") + var ( + tb int + ack = make([]byte, 1) + ) + buff := make([]byte, 8192) + n, err := c.Conn.Read(buff) + if err != nil { + log.Errorf("failed to read from channel: %s", err) + return 0 + } + + si := DecodeStartIndication(buff[:n]) + log.Infof("received start indication: %v", si) + + for { + n, e := c.Conn.Read(buff) + if e != nil { + return 0 + } + tb += n + _, _ = c.Conn.Write(ack) + + if tb >= si.TransferSize { + break + } + } + + return time.Since(si.Started) +}