diff --git a/relay/testec2/main.go b/relay/testec2/main.go new file mode 100644 index 000000000..f182df7a7 --- /dev/null +++ b/relay/testec2/main.go @@ -0,0 +1,383 @@ +package main + +import ( + "crypto/rand" + "flag" + "fmt" + "net" + "os" + "sync" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/relay/testec2/tun" + "github.com/netbirdio/netbird/util" +) + +var ( + dataSize = 1024 * 1024 * 50 // 50MB + pairs = []int{1, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100} + signalListenAddress = ":8081" + + relaySrvAddress string + turnSrvAddress string + signalURL string + udpListener string +) + +type testResult struct { + numOfPairs int + duration time.Duration + speed float64 +} + +func (tr testResult) Speed() string { + speed := tr.speed + var unit string + + switch { + case speed < 1024: + unit = "B/s" + case speed < 1048576: + speed /= 1024 + unit = "KB/s" + case speed < 1073741824: + speed /= 1048576 + unit = "MB/s" + default: + speed /= 1073741824 + unit = "GB/s" + } + + return fmt.Sprintf("%.2f %s", speed, unit) +} + +func seedRandomData(size int) ([]byte, error) { + token := make([]byte, size) + _, err := rand.Read(token) + if err != nil { + return nil, err + } + return token, nil +} + +func avg(transferDuration []time.Duration) (time.Duration, float64) { + var totalDuration time.Duration + for _, d := range transferDuration { + totalDuration += d + } + avgDuration := totalDuration / time.Duration(len(transferDuration)) + bps := float64(dataSize) / avgDuration.Seconds() + return avgDuration, bps +} + +func RelayReceiverMain() []testResult { + testResults := make([]testResult, 0, len(pairs)) + for _, p := range pairs { + tr := testResult{numOfPairs: p} + td := relayReceive(relaySrvAddress, p) + tr.duration, tr.speed = avg(td) + + testResults = append(testResults, tr) + } + + return testResults +} + +func RelaySenderMain() { + log.Infof("starting sender") + log.Infof("starting seed phase") + + testData, err := seedRandomData(dataSize) + if err != nil { + log.Fatalf("failed to seed random data: %s", err) + } + + log.Infof("data size: %d", len(testData)) + + for n, p := range pairs { + log.Infof("running test with %d pairs", p) + relayTransfer(relaySrvAddress, testData, p) + + // grant time to prepare new receivers + if n < len(pairs)-1 { + time.Sleep(3 * time.Second) + } + } + +} + +// TRUNSenderMain is the sender +// - allocate turn clients +// - send relayed addresses to signal server in batch +// - wait for signal server to send back addresses in a map +// - send test data to each address in parallel +func TRUNSenderMain() { + log.Infof("starting TURN sender test") + + log.Infof("starting seed random data: %d", dataSize) + testData, err := seedRandomData(dataSize) + if err != nil { + log.Fatalf("failed to seed random data: %s", err) + } + + ss := SignalClient{signalURL} + + for _, p := range pairs { + log.Infof("running test with %d pairs", p) + + turnConns := make(map[string]*TurnConn) + addresses := make([]string, 0, len(pairs)) + for i := 0; i < p; i++ { + tc := AllocateTurnClient(turnSrvAddress) + log.Infof("allocated turn client: %s", tc.Address().String()) + turnConns[tc.Address().String()] = tc + addresses = append(addresses, tc.Address().String()) + } + + log.Infof("send addresses via signal server: %d", len(addresses)) + clientAddresses, err := ss.SendAddress(addresses) + if err != nil { + log.Fatalf("failed to send address: %s", err) + } + log.Infof("received addresses: %v", clientAddresses.Address) + + var i int + devices := make([]*tun.Device, 0, len(clientAddresses.Address)) + for k, v := range clientAddresses.Address { + tc, ok := turnConns[k] + if !ok { + log.Fatalf("failed to find turn conn: %s", k) + } + + addr, err := net.ResolveUDPAddr("udp", v) + if err != nil { + log.Fatalf("failed to resolve udp address: %s", err) + } + device := &tun.Device{ + Name: fmt.Sprintf("mtun-sender-%d", i), + IP: fmt.Sprintf("10.0.%d.1", i), + PConn: tc.relayConn, + DstAddr: addr, + } + + err = device.Up() + if err != nil { + log.Fatalf("failed to bring up device: %s", err) + } + + devices = append(devices, device) + i++ + } + + log.Infof("waiting for tcpListeners to be ready") + time.Sleep(2 * time.Second) + + tcpConns := make([]net.Conn, 0, len(devices)) + for i := range devices { + addr := fmt.Sprintf("10.0.%d.2:9999", i) + log.Infof("dialing: %s", addr) + tcpConn, err := net.Dial("tcp", addr) + if err != nil { + log.Fatalf("failed to dial tcp: %s", err) + } + tcpConns = append(tcpConns, tcpConn) + } + + log.Infof("start test data transfer for %d pairs", len(devices)) + testDataLen := len(testData) + wg := sync.WaitGroup{} + for i, tcpConn := range tcpConns { + log.Infof("sending test data to device: %d", i) + wg.Add(1) + go func(i int, tcpConn net.Conn) { + defer wg.Done() + defer tcpConn.Close() + + log.Infof("start to sending test data: %s", tcpConn.RemoteAddr()) + + si := NewStartInidication(time.Now(), testDataLen) + _, err = tcpConn.Write(si) + if err != nil { + log.Errorf("failed to write to tcp: %s", err) + return + } + + pieceSize := 1024 + for j := 0; j < testDataLen; j += pieceSize { + end := j + pieceSize + if end > testDataLen { + end = testDataLen + } + _, writeErr := tcpConn.Write(testData[j:end]) + if writeErr != nil { + log.Errorf("failed to write to tcp conn: %s", writeErr) + return + } + } + + time.Sleep(3 * time.Second) + }(i, tcpConn) + } + wg.Wait() + + for _, d := range devices { + _ = d.Close() + } + + log.Infof("test finished with %d pairs", p) + } +} + +func TURNReaderMain() []testResult { + log.Infof("starting TURN receiver test") + si := NewSignalService() + go func() { + log.Infof("starting signal server") + err := si.Listen(signalListenAddress) + if err != nil { + log.Errorf("failed to listen: %s", err) + } + }() + + testResults := make([]testResult, 0, len(pairs)) + for range pairs { + log.Infof("waiting for addresses") + addresses := <-si.AddressesChan + log.Infof("received addresses: %d", len(addresses)) + + conns := make([]*net.UDPConn, 0, len(addresses)) + clientAddresses := make(map[string]string, len(addresses)) + devices := make([]*tun.Device, 0, len(conns)) + for i, addr := range addresses { + localAddr, err := net.ResolveUDPAddr("udp", udpListener) + if err != nil { + log.Fatalf("failed to resolve UDP address: %s", err) + } + + conn, err := net.ListenUDP("udp", localAddr) + if err != nil { + log.Fatalf("failed to create UDP connection: %s", err) + } + + conns = append(conns, conn) + clientAddresses[addr] = conn.LocalAddr().String() + + dstAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + log.Fatalf("failed to resolve address: %s", err) + } + + device := &tun.Device{ + Name: fmt.Sprintf("mtun-%d", i), + IP: fmt.Sprintf("10.0.%d.2", i), + PConn: conn, + DstAddr: dstAddr, + } + + err = device.Up() + if err != nil { + log.Fatalf("failed to bring up device: %s, %s", device.Name, err) + } + devices = append(devices, device) + } + + // send back local addresses + log.Infof("response addresses back: %d", len(clientAddresses)) + si.ClientAddressChan <- clientAddresses + + durations := make(chan time.Duration, len(conns)) + for _, d := range devices { + go func(d *tun.Device) { + tcpListener, err := net.Listen("tcp", d.IP+":9999") + if err != nil { + log.Fatalf("failed to listen on tcp: %s", err) + } + defer tcpListener.Close() + log := log.WithField("device", tcpListener.Addr()) + + tcpConn, err := tcpListener.Accept() + if err != nil { + log.Fatalf("failed to accept connection: %s", err) + } + log.Infof("remote peer connected") + + buf := make([]byte, 103) + n, err := tcpConn.Read(buf) + if err != nil { + log.Fatalf("failed to read from tcp: %s", err) + } + + si := DecodeStartIndication(buf[:n]) + log.Infof("received start indication: %v, %d", si, n) + + buf = make([]byte, 8192) + i, err := tcpConn.Read(buf) + if err != nil { + log.Fatalf("failed to read from tcp: %s", err) + } + now := time.Now() + + for i < si.TransferSize { + n, err := tcpConn.Read(buf) + if err != nil { + log.Fatalf("failed to read from tcp: %s", err) + } + i += n + } + log.Infof("finished reading") + durations <- time.Since(now) + }(d) + } + + durationsList := make([]time.Duration, 0, len(conns)) + for d := range durations { + durationsList = append(durationsList, d) + if len(durationsList) == len(conns) { + close(durations) + } + } + + avgDuration, avgSpeed := avg(durationsList) + ts := testResult{ + numOfPairs: len(conns), + duration: avgDuration, + speed: avgSpeed, + } + testResults = append(testResults, ts) + + for _, d := range devices { + _ = d.Close() + } + } + + return testResults +} + +func main() { + var mode string + + _ = util.InitLog("debug", "console") + flag.StringVar(&mode, "mode", "sender", "sender or receiver mode") + flag.Parse() + + relaySrvAddress = os.Getenv("TEST_RELAY_SERVER") // rel://ip:port + turnSrvAddress = os.Getenv("TEST_TURN_SERVER") // ip:3478 + signalURL = os.Getenv("TEST_SIGNAL_URL") // http://receiver_ip:8081 + udpListener = os.Getenv("TEST_UDP_LISTENER") // IP:0 + + if mode == "receiver" { + relayResult := RelayReceiverMain() + turnResults := TURNReaderMain() + for i := 0; i < len(turnResults); i++ { + log.Infof("pairs: %d,\tRelay speed:\t%s,\trelay duration:\t%s", relayResult[i].numOfPairs, relayResult[i].Speed(), relayResult[i].duration) + log.Infof("pairs: %d,\tTURN speed:\t%s,\tturn duration:\t%s", turnResults[i].numOfPairs, turnResults[i].Speed(), turnResults[i].duration) + } + } else { + RelaySenderMain() + // grant time for receiver to start + time.Sleep(3 * time.Second) + TRUNSenderMain() + } +} diff --git a/relay/testec2/relay.go b/relay/testec2/relay.go new file mode 100644 index 000000000..9071f3dcd --- /dev/null +++ b/relay/testec2/relay.go @@ -0,0 +1,160 @@ +package main + +import ( + "context" + "fmt" + "net" + "sync" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/relay/auth/hmac" + "github.com/netbirdio/netbird/relay/client" +) + +var ( + hmacTokenStore = &hmac.TokenStore{} +) + +func relayTransfer(serverConnURL string, testData []byte, peerPairs int) { + ctx := context.Background() + + clientsSender := make([]*client.Client, peerPairs) + for i := 0; i < cap(clientsSender); i++ { + c := client.NewClient(ctx, serverConnURL, hmacTokenStore, "sender-"+fmt.Sprint(i)) + if err := c.Connect(); err != nil { + log.Fatalf("failed to connect to server: %s", err) + } + clientsSender[i] = c + } + + connsSender := make([]net.Conn, 0, peerPairs) + for i := 0; i < len(clientsSender); i++ { + conn, err := clientsSender[i].OpenConn("receiver-" + fmt.Sprint(i)) + if err != nil { + log.Fatalf("failed to bind channel: %s", err) + } + connsSender = append(connsSender, conn) + } + + defer func() { + for i := 0; i < len(connsSender); i++ { + err := connsSender[i].Close() + if err != nil { + log.Errorf("failed to close connection: %s", err) + } + } + }() + + wg := sync.WaitGroup{} + var writeErr error + for i, conn := range connsSender { + wg.Add(1) + go func(i int, conn net.Conn) { + defer wg.Done() + + si := NewStartInidication(time.Now(), len(testData)) + _, err := conn.Write(si) + if err != nil { + log.Errorf("failed to write to channel: %s", err) + return + } + log.Infof("sent start indication") + + pieceSize := 1024 + testDataLen := len(testData) + + for j := 0; j < testDataLen; j += pieceSize { + end := j + pieceSize + if end > testDataLen { + end = testDataLen + } + _, writeErr = conn.Write(testData[j:end]) + if writeErr != nil { + log.Errorf("failed to write to channel: %s", writeErr) + return + } + } + }(i, conn) + } + wg.Wait() +} + +func relayReceive(serverConnURL string, peerPairs int) []time.Duration { + clientsReceiver := make([]*client.Client, peerPairs) + for i := 0; i < cap(clientsReceiver); i++ { + c := client.NewClient(context.Background(), serverConnURL, hmacTokenStore, "receiver-"+fmt.Sprint(i)) + err := c.Connect() + if err != nil { + log.Fatalf("failed to connect to server: %s", err) + } + clientsReceiver[i] = c + } + + connsReceiver := make([]net.Conn, 0, peerPairs) + for i := 0; i < len(clientsReceiver); i++ { + conn, err := clientsReceiver[i].OpenConn("sender-" + fmt.Sprint(i)) + if err != nil { + log.Fatalf("failed to bind channel: %s", err) + } + connsReceiver = append(connsReceiver, conn) + } + + defer func() { + for i := 0; i < len(connsReceiver); i++ { + if err := connsReceiver[i].Close(); err != nil { + log.Errorf("failed to close connection: %s", err) + } + } + }() + + durations := make(chan time.Duration, len(connsReceiver)) + wg := sync.WaitGroup{} + for i, conn := range connsReceiver { + wg.Add(1) + go func(i int, conn net.Conn) { + defer wg.Done() + buf := make([]byte, 8192) + + n, readErr := conn.Read(buf) + if readErr != nil { + log.Errorf("failed to read from channel: %s", readErr) + return + } + + si := DecodeStartIndication(buf[:n]) + log.Infof("received start indication: %v", si) + + receivedSize, err := conn.Read(buf) + if err != nil { + log.Fatalf("failed to read from relay: %s", err) + } + now := time.Now() + + rcv := 0 + for receivedSize < si.TransferSize { + n, readErr = conn.Read(buf) + if readErr != nil { + log.Errorf("failed to read from channel: %s", readErr) + return + } + + receivedSize += n + rcv += n + } + durations <- time.Since(now) + }(i, conn) + } + wg.Wait() + + durationsList := make([]time.Duration, 0, len(connsReceiver)) + for d := range durations { + durationsList = append(durationsList, d) + if len(durationsList) == len(connsReceiver) { + close(durations) + } + } + + return durationsList +} diff --git a/relay/testec2/signal.go b/relay/testec2/signal.go new file mode 100644 index 000000000..4253cb5e6 --- /dev/null +++ b/relay/testec2/signal.go @@ -0,0 +1,89 @@ +package main + +import ( + "bytes" + "encoding/json" + "net/http" + + log "github.com/sirupsen/logrus" +) + +type PeerAddr struct { + Address []string +} + +type ClientPeerAddr struct { + Address map[string]string +} + +type Signal struct { + AddressesChan chan []string + ClientAddressChan chan map[string]string +} + +func NewSignalService() *Signal { + return &Signal{ + AddressesChan: make(chan []string), + ClientAddressChan: make(chan map[string]string), + } +} + +func (rs *Signal) Listen(listenAddr string) error { + http.HandleFunc("/", rs.onNewAddresses) + return http.ListenAndServe(listenAddr, nil) +} + +func (rs *Signal) onNewAddresses(w http.ResponseWriter, r *http.Request) { + var msg PeerAddr + err := json.NewDecoder(r.Body).Decode(&msg) + if err != nil { + log.Errorf("Error decoding message: %v", err) + } + + log.Infof("received addresses: %d", len(msg.Address)) + rs.AddressesChan <- msg.Address + clientAddresses := <-rs.ClientAddressChan + + respMsg := ClientPeerAddr{ + Address: clientAddresses, + } + data, err := json.Marshal(respMsg) + if err != nil { + log.Errorf("Error marshalling message: %v", err) + return + } + + _, err = w.Write(data) + if err != nil { + log.Errorf("Error writing response: %v", err) + } +} + +type SignalClient struct { + SignalURL string +} + +func (ss SignalClient) SendAddress(addresses []string) (*ClientPeerAddr, error) { + msg := PeerAddr{ + Address: addresses, + } + data, err := json.Marshal(msg) + if err != nil { + return nil, err + } + + response, err := http.Post(ss.SignalURL, "application/json", bytes.NewBuffer(data)) + if err != nil { + return nil, err + } + + defer response.Body.Close() + + log.Debugf("wait for signal response") + var respPeerAddress ClientPeerAddr + err = json.NewDecoder(response.Body).Decode(&respPeerAddress) + if err != nil { + return nil, err + } + return &respPeerAddress, nil +} diff --git a/relay/testec2/start_msg.go b/relay/testec2/start_msg.go new file mode 100644 index 000000000..156ee52d3 --- /dev/null +++ b/relay/testec2/start_msg.go @@ -0,0 +1,37 @@ +package main + +import ( + "bytes" + "encoding/gob" + "time" + + log "github.com/sirupsen/logrus" +) + +type StartIndication struct { + Started time.Time + TransferSize int +} + +func NewStartInidication(started time.Time, transferSize int) []byte { + si := StartIndication{ + Started: started, + TransferSize: transferSize, + } + + var data bytes.Buffer + err := gob.NewEncoder(&data).Encode(si) + if err != nil { + log.Fatal("encode error:", err) + } + return data.Bytes() +} + +func DecodeStartIndication(data []byte) StartIndication { + var si StartIndication + err := gob.NewDecoder(bytes.NewReader(data)).Decode(&si) + if err != nil { + log.Fatal("decode error:", err) + } + return si +} diff --git a/relay/testec2/tun/proxy.go b/relay/testec2/tun/proxy.go new file mode 100644 index 000000000..3e671b020 --- /dev/null +++ b/relay/testec2/tun/proxy.go @@ -0,0 +1,70 @@ +package tun + +import ( + "net" + "sync/atomic" + + log "github.com/sirupsen/logrus" +) + +type Proxy struct { + Device *Device + PConn net.PacketConn + DstAddr net.Addr + shutdownFlag atomic.Bool +} + +func (p *Proxy) Start() { + go p.readFromDevice() + go p.readFromConn() +} + +func (p *Proxy) Close() { + p.shutdownFlag.Store(true) +} + +func (p *Proxy) readFromDevice() { + buf := make([]byte, 1500) + for { + n, err := p.Device.Read(buf) + if err != nil { + if p.shutdownFlag.Load() { + return + } + log.Errorf("failed to read from device: %s", err) + return + } + + _, err = p.PConn.WriteTo(buf[:n], p.DstAddr) + if err != nil { + if p.shutdownFlag.Load() { + return + } + log.Errorf("failed to write to conn: %s", err) + return + } + } +} + +func (p *Proxy) readFromConn() { + buf := make([]byte, 1500) + for { + n, _, err := p.PConn.ReadFrom(buf) + if err != nil { + if p.shutdownFlag.Load() { + return + } + log.Errorf("failed to read from conn: %s", err) + return + } + + _, err = p.Device.Write(buf[:n]) + if err != nil { + if p.shutdownFlag.Load() { + return + } + log.Errorf("failed to write to device: %s", err) + return + } + } +} diff --git a/relay/testec2/tun/tun.go b/relay/testec2/tun/tun.go new file mode 100644 index 000000000..3f8168009 --- /dev/null +++ b/relay/testec2/tun/tun.go @@ -0,0 +1,108 @@ +package tun + +import ( + "net" + + log "github.com/sirupsen/logrus" + "github.com/songgao/water" + "github.com/vishvananda/netlink" +) + +type Device struct { + Name string + IP string + PConn net.PacketConn + DstAddr net.Addr + + iFace *water.Interface + proxy *Proxy +} + +func (d *Device) Up() error { + cfg := water.Config{ + DeviceType: water.TUN, + PlatformSpecificParams: water.PlatformSpecificParams{ + Name: d.Name, + }, + } + iFace, err := water.New(cfg) + if err != nil { + return err + } + d.iFace = iFace + + err = d.assignIP() + if err != nil { + return err + } + + err = d.bringUp() + if err != nil { + return err + } + + d.proxy = &Proxy{ + Device: d, + PConn: d.PConn, + DstAddr: d.DstAddr, + } + d.proxy.Start() + return nil +} + +func (d *Device) Close() error { + if d.proxy != nil { + d.proxy.Close() + } + if d.iFace != nil { + return d.iFace.Close() + } + return nil +} + +func (d *Device) Read(b []byte) (int, error) { + return d.iFace.Read(b) +} + +func (d *Device) Write(b []byte) (int, error) { + return d.iFace.Write(b) +} + +func (d *Device) assignIP() error { + iface, err := netlink.LinkByName(d.Name) + if err != nil { + log.Errorf("failed to get TUN device: %v", err) + return err + } + + ip := net.IPNet{ + IP: net.ParseIP(d.IP), + Mask: net.CIDRMask(24, 32), + } + + addr := &netlink.Addr{ + IPNet: &ip, + } + err = netlink.AddrAdd(iface, addr) + if err != nil { + log.Errorf("failed to add IP address: %v", err) + return err + } + return nil +} + +func (d *Device) bringUp() error { + iface, err := netlink.LinkByName(d.Name) + if err != nil { + log.Errorf("failed to get device: %v", err) + return err + } + + // Bring the interface up + err = netlink.LinkSetUp(iface) + if err != nil { + log.Errorf("failed to set device up: %v", err) + return err + } + return nil +} diff --git a/relay/testec2/tun/tun_test.go b/relay/testec2/tun/tun_test.go new file mode 100644 index 000000000..081be3c92 --- /dev/null +++ b/relay/testec2/tun/tun_test.go @@ -0,0 +1,16 @@ +package tun + +import "testing" + +func TestDevice_assignIP(t *testing.T) { + d := Device{ + Name: "mytun0", + IP: "10.0.0.1", + } + err := d.Up() + if err != nil { + t.Fatalf("failed to bring up device: %v", err) + } + defer d.Close() + select {} +} diff --git a/relay/testec2/turn.go b/relay/testec2/turn.go new file mode 100644 index 000000000..0351a341e --- /dev/null +++ b/relay/testec2/turn.go @@ -0,0 +1,81 @@ +package main + +import ( + "fmt" + "net" + + "github.com/pion/logging" + "github.com/pion/turn/v3" + log "github.com/sirupsen/logrus" +) + +type TurnConn struct { + conn net.Conn + turnClient *turn.Client + relayConn net.PacketConn +} + +func (tc *TurnConn) Address() net.Addr { + return tc.relayConn.LocalAddr() +} + +func (tc *TurnConn) Close() { + _ = tc.relayConn.Close() + tc.turnClient.Close() + _ = tc.conn.Close() +} + +func AllocateTurnClient(serverAddr string) *TurnConn { + conn, err := net.Dial("tcp", serverAddr) + if err != nil { + log.Fatal(err) + } + + turnClient, err := getTurnClient(serverAddr, conn) + if err != nil { + log.Fatal(err) + } + + relayConn, err := turnClient.Allocate() + if err != nil { + log.Fatal(err) + } + + return &TurnConn{ + conn: conn, + turnClient: turnClient, + relayConn: relayConn, + } +} + +func getTurnClient(address string, conn net.Conn) (*turn.Client, error) { + // Dial TURN Server + addrStr := fmt.Sprintf("%s:%d", address, 443) + + fac := logging.NewDefaultLoggerFactory() + //fac.DefaultLogLevel = logging.LogLevelTrace + + // Start a new TURN Client and wrap our net.Conn in a STUNConn + // This allows us to simulate datagram based communication over a net.Conn + cfg := &turn.ClientConfig{ + TURNServerAddr: address, + Conn: turn.NewSTUNConn(conn), + Username: "test", + Password: "test", + LoggerFactory: fac, + } + + client, err := turn.NewClient(cfg) + if err != nil { + return nil, fmt.Errorf("failed to create TURN client for server %s: %s", addrStr, err) + } + + // Start listening on the conn provided. + err = client.Listen() + if err != nil { + client.Close() + return nil, fmt.Errorf("failed to listen on TURN client for server %s: %s", addrStr, err) + } + + return client, nil +}