From f4f09ec2e2382c6dc32e73af9b8e09c5cf01ee19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 14 Aug 2024 16:46:12 +0200 Subject: [PATCH] Fix deleted files --- client/internal/peer/worker_ice.go | 21 +-- relay/teste2c/main.go | 221 ----------------------------- relay/teste2c/relay.go | 146 ------------------- relay/teste2c/signal.go | 91 ------------ relay/teste2c/start_msg.go | 37 ----- relay/teste2c/turn.go | 156 -------------------- 6 files changed, 12 insertions(+), 660 deletions(-) delete mode 100644 relay/teste2c/main.go delete mode 100644 relay/teste2c/relay.go delete mode 100644 relay/teste2c/signal.go delete mode 100644 relay/teste2c/start_msg.go delete mode 100644 relay/teste2c/turn.go diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 26f3d04ab..b74ca7255 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -5,7 +5,6 @@ import ( "fmt" "net" "net/netip" - "runtime" "sync" "sync/atomic" "time" @@ -417,14 +416,18 @@ func candidateViaRoutes(candidate ice.Candidate, clientRoutes route.HAMap) bool } func candidateTypes() []ice.CandidateType { - if hasICEForceRelayConn() { - return []ice.CandidateType{ice.CandidateTypeRelay} - } - // TODO: remove this once we have refactored userspace proxy into the bind package - if runtime.GOOS == "ios" { - return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive} - } - return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay} + return []ice.CandidateType{ice.CandidateTypeRelay} + /* + if hasICEForceRelayConn() { + return []ice.CandidateType{ice.CandidateTypeRelay} + } + // TODO: remove this once we have refactored userspace proxy into the bind package + if runtime.GOOS == "ios" { + return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive} + } + return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay} + + */ } func candidateTypesP2P() []ice.CandidateType { diff --git a/relay/teste2c/main.go b/relay/teste2c/main.go deleted file mode 100644 index 94e3fb032..000000000 --- a/relay/teste2c/main.go +++ /dev/null @@ -1,221 +0,0 @@ -package main - -import ( - "crypto/rand" - "flag" - "net" - "sync" - "time" - - log "github.com/sirupsen/logrus" -) - -var ( - dataSize = 1024 * 1024 * 10 // 5MB - pairs = []int{1, 3, 5, 10, 50, 100} - relaySrvAddress = "rel://relay-eu1.stage.npeer.io:80" - turnSrvAddress = "relay-eu1.stage.npeer.io:3478" - signalAddress = "http://172.20.8.77:8081" // ip address of the receiver instance - signalListenAddress = ":8081" -) - -type testResult struct { - numOfPairs int - duration time.Duration - speed float64 -} - -func seedRandomData(size int) ([]byte, error) { - token := make([]byte, size) - _, err := rand.Read(token) - if err != nil { - return nil, err - } - return token, nil -} - -func avg(transferDuration []time.Duration) (time.Duration, float64) { - var totalDuration time.Duration - for _, d := range transferDuration { - totalDuration += d - } - avgDuration := totalDuration / time.Duration(len(transferDuration)) - mbps := float64(dataSize) / avgDuration.Seconds() / 1024 / 1024 - return avgDuration, mbps -} - -func RelayReceiverMain() []testResult { - testResults := make([]testResult, 0, len(pairs)) - for _, p := range pairs { - tr := testResult{numOfPairs: p} - td := relayReceive(relaySrvAddress, p) - tr.duration, tr.speed = avg(td) - - testResults = append(testResults, tr) - } - - return testResults -} - -func RelaySenderMain() { - log.Infof("starting sender") - log.Infof("starting seed phase") - - testData, err := seedRandomData(dataSize) - if err != nil { - log.Fatalf("failed to seed random data: %s", err) - } - - log.Infof("data size: %d", len(testData)) - - for n, p := range pairs { - log.Infof("running test with %d pairs", p) - relayTransfer(relaySrvAddress, testData, p) - - // give time to prepare new receivers - if n < len(pairs)-1 { - time.Sleep(3 * time.Second) - } - } - -} - -// TRUNServerMain is the sender -// - allocate turn clients -// - send relayed addresses to signal server in batch -// - wait for signal server to send back addresses in a map -// - send test data to each address in parallel -func TRUNServerMain() { - log.Infof("starting turn test") - - log.Infof("starting seed random data: %d", dataSize) - testData, err := seedRandomData(dataSize) - if err != nil { - log.Fatalf("failed to seed random data: %s", err) - } - - ss := SignalClient{signalAddress} - - for _, p := range pairs { - log.Infof("running test with %d pairs", p) - turnConns := make(map[string]*TurnConn) - addresses := make([]string, 0, len(pairs)) - for i := 0; i < p; i++ { - tc := AllocateTurnClient(turnSrvAddress) - log.Infof("allocated turn client: %s", tc.Address().String()) - turnConns[tc.Address().String()] = tc - addresses = append(addresses, tc.Address().String()) - } - - log.Infof("send addresses via signal server: %v", addresses) - clientAddresses, err := ss.SendAddress(addresses) - if err != nil { - log.Fatalf("failed to send address: %s", err) - } - - wg := sync.WaitGroup{} - wg.Add(len(clientAddresses.Address)) - for k, v := range clientAddresses.Address { - go func(k, v string) { - log.Infof("sending test data to: %s", v) - defer wg.Done() - tc, ok := turnConns[k] - if !ok { - log.Fatalf("failed to find turn conn: %s", k) - } - addr, err := net.ResolveUDPAddr("udp", v) - if err != nil { - log.Fatalf("failed to resolve udp address: %s", err) - } - tc.WriteTestData(testData, addr) - }(k, v) - } - wg.Wait() - } -} - -func TURNClientMain() []testResult { - log.Infof("starting turn client test") - si := NewSignalService() - go func() { - log.Infof("starting signal server") - err := si.Listen(signalListenAddress) - if err != nil { - log.Errorf("failed to listen: %s", err) - } - }() - - testResults := make([]testResult, 0, len(pairs)) - for _ = range pairs { - log.Infof("waiting for addresses") - addresses := <-si.AddressesChan - log.Infof("received addresses: %d", len(addresses)) - - conns := make([]*UDPConn, 0, len(addresses)) - clientAddresses := make(map[string]string, len(addresses)) - for _, addr := range addresses { - conn, err := Dial(addr) - if err != nil { - log.Fatalf("failed to dial: %s", err) - } - log.Infof("made client UDP conn: %s", conn.LocalAddr()) - conns = append(conns, conn) - clientAddresses[addr] = conn.LocalAddr().String() - } - - // send back local addresses - log.Infof("response addresses back: %v", clientAddresses) - si.ClientAddressChan <- clientAddresses - - durations := make(chan time.Duration, len(conns)) - for _, c := range conns { - go func(c *UDPConn) { - log.Infof("start to read test data from: %s", c.RemoteAddr()) - duration := c.ReadTestData(c) - durations <- duration - _ = c.Close() - }(c) - } - - durationsList := make([]time.Duration, 0, len(conns)) - for d := range durations { - durationsList = append(durationsList, d) - if len(durationsList) == len(conns) { - close(durations) - } - } - - avgDuration, avgSpeed := avg(durationsList) - ts := testResult{ - numOfPairs: len(conns), - duration: avgDuration, - speed: avgSpeed, - } - testResults = append(testResults, ts) - } - - return testResults -} - -func main() { - log.SetLevel(log.DebugLevel) - var mode string - flag.StringVar(&mode, "mode", "sender", "sender or receiver mode") - flag.Parse() - - if mode == "receiver" { - relayResult := RelayReceiverMain() - time.Sleep(3 * time.Second) - turnResults := TURNClientMain() - - for i := 0; i < len(turnResults); i++ { - log.Infof("pairs: %d, relay duration: %s, relay speed: %.2f MB/s", relayResult[i].numOfPairs, relayResult[i].duration, relayResult[i].speed) - log.Infof("pairs: %d, turn duration: %s, turn speed: %.2f MB/s", turnResults[i].numOfPairs, turnResults[i].duration, turnResults[i].speed) - } - } else { - RelaySenderMain() - // grant time for receiver to start - time.Sleep(6 * time.Second) - TRUNServerMain() - } -} diff --git a/relay/teste2c/relay.go b/relay/teste2c/relay.go deleted file mode 100644 index 0e370dff8..000000000 --- a/relay/teste2c/relay.go +++ /dev/null @@ -1,146 +0,0 @@ -package main - -import ( - "context" - "fmt" - "net" - "sync" - "time" - - log "github.com/sirupsen/logrus" - - "github.com/netbirdio/netbird/relay/auth/hmac" - "github.com/netbirdio/netbird/relay/client" -) - -var ( - hmacTokenStore = &hmac.TokenStore{} -) - -func relayTransfer(serverConnURL string, testData []byte, peerPairs int) { - ctx := context.Background() - - clientsSender := make([]*client.Client, peerPairs) - for i := 0; i < cap(clientsSender); i++ { - c := client.NewClient(ctx, serverConnURL, hmacTokenStore, "sender-"+fmt.Sprint(i)) - if err := c.Connect(); err != nil { - log.Fatalf("failed to connect to server: %s", err) - } - clientsSender[i] = c - } - - connsSender := make([]net.Conn, 0, peerPairs) - for i := 0; i < len(clientsSender); i++ { - conn, err := clientsSender[i].OpenConn("receiver-" + fmt.Sprint(i)) - if err != nil { - log.Fatalf("failed to bind channel: %s", err) - } - connsSender = append(connsSender, conn) - } - - defer func() { - for i := 0; i < len(connsSender); i++ { - err := connsSender[i].Close() - if err != nil { - log.Errorf("failed to close connection: %s", err) - } - } - }() - - wg := sync.WaitGroup{} - var writeErr error - for i := 0; i < len(connsSender); i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - - si := NewStartInidication(time.Now(), len(testData)) - _, err := connsSender[i].Write(si) - if err != nil { - log.Errorf("failed to write to channel: %s", err) - return - } - log.Infof("sent start indication") - - pieceSize := 1024 - testDataLen := len(testData) - - for j := 0; j < testDataLen; j += pieceSize { - end := j + pieceSize - if end > testDataLen { - end = testDataLen - } - _, writeErr = connsSender[i].Write(testData[j:end]) - if writeErr != nil { - log.Errorf("failed to write to channel: %s", writeErr) - return - } - } - }(i) - } - wg.Wait() -} - -func relayReceive(serverConnURL string, peerPairs int) []time.Duration { - clientsReceiver := make([]*client.Client, peerPairs) - for i := 0; i < cap(clientsReceiver); i++ { - c := client.NewClient(context.Background(), serverConnURL, hmacTokenStore, "receiver-"+fmt.Sprint(i)) - err := c.Connect() - if err != nil { - log.Fatalf("failed to connect to server: %s", err) - } - clientsReceiver[i] = c - } - - connsReceiver := make([]net.Conn, 0, peerPairs) - for i := 0; i < len(clientsReceiver); i++ { - conn, err := clientsReceiver[i].OpenConn("sender-" + fmt.Sprint(i)) - if err != nil { - log.Fatalf("failed to bind channel: %s", err) - } - connsReceiver = append(connsReceiver, conn) - } - - defer func() { - for i := 0; i < len(connsReceiver); i++ { - if err := connsReceiver[i].Close(); err != nil { - log.Errorf("failed to close connection: %s", err) - } - } - }() - - var transferDuration []time.Duration - wg := sync.WaitGroup{} - for i := 0; i < peerPairs; i++ { - wg.Add(1) - go func(i int) { - defer wg.Done() - buf := make([]byte, 8192) - - n, readErr := connsReceiver[i].Read(buf) - if readErr != nil { - log.Errorf("failed to read from channel: %s", readErr) - return - } - - si := DecodeStartIndication(buf[:n]) - log.Infof("received start indication: %v", si) - - rcv := 0 - for receivedSize := 0; receivedSize < si.TransferSize; { - n, readErr = connsReceiver[i].Read(buf) - if readErr != nil { - log.Errorf("failed to read from channel: %s", readErr) - return - } - - receivedSize += n - rcv += n - } - transferDuration = append(transferDuration, time.Since(si.Started)) - }(i) - } - - wg.Wait() - return transferDuration -} diff --git a/relay/teste2c/signal.go b/relay/teste2c/signal.go deleted file mode 100644 index 4867d6dd0..000000000 --- a/relay/teste2c/signal.go +++ /dev/null @@ -1,91 +0,0 @@ -package main - -import ( - "bytes" - "encoding/json" - "net/http" - - log "github.com/sirupsen/logrus" -) - -type PeerAddr struct { - Address []string -} - -type ClientPeerAddr struct { - Address map[string]string -} - -type Signal struct { - AddressesChan chan []string - ClientAddressChan chan map[string]string -} - -func NewSignalService() *Signal { - return &Signal{ - AddressesChan: make(chan []string, 0), - ClientAddressChan: make(chan map[string]string, 0), - } -} - -func (rs *Signal) Listen(listenAdddr string) error { - http.HandleFunc("/", rs.onNewAddresses) - return http.ListenAndServe(listenAdddr, nil) -} - -func (rs *Signal) onNewAddresses(w http.ResponseWriter, r *http.Request) { - var msg PeerAddr - err := json.NewDecoder(r.Body).Decode(&msg) - if err != nil { - log.Errorf("Error decoding message: %v", err) - } - - log.Infof("received addresses: %v", msg.Address) - rs.AddressesChan <- msg.Address - clientAddresses := <-rs.ClientAddressChan - log.Infof("Sending back addresses: %v", clientAddresses) - - respMsg := ClientPeerAddr{ - Address: clientAddresses, - } - data, err := json.Marshal(respMsg) - if err != nil { - log.Errorf("Error marshalling message: %v", err) - return - } - - _, err = w.Write(data) - if err != nil { - log.Errorf("Error writing response: %v", err) - } -} - -// "http://localhost:8080/address" -type SignalClient struct { - SignalAddress string -} - -func (ss SignalClient) SendAddress(addresses []string) (*ClientPeerAddr, error) { - msg := PeerAddr{ - Address: addresses, - } - data, err := json.Marshal(msg) - if err != nil { - return nil, err - } - - response, err := http.Post(ss.SignalAddress, "application/json", bytes.NewBuffer(data)) - if err != nil { - return nil, err - } - - defer response.Body.Close() - - log.Debugf("wait for signal response") - var respPeerAddress ClientPeerAddr - err = json.NewDecoder(response.Body).Decode(&respPeerAddress) - if err != nil { - return nil, err - } - return &respPeerAddress, nil -} diff --git a/relay/teste2c/start_msg.go b/relay/teste2c/start_msg.go deleted file mode 100644 index 156ee52d3..000000000 --- a/relay/teste2c/start_msg.go +++ /dev/null @@ -1,37 +0,0 @@ -package main - -import ( - "bytes" - "encoding/gob" - "time" - - log "github.com/sirupsen/logrus" -) - -type StartIndication struct { - Started time.Time - TransferSize int -} - -func NewStartInidication(started time.Time, transferSize int) []byte { - si := StartIndication{ - Started: started, - TransferSize: transferSize, - } - - var data bytes.Buffer - err := gob.NewEncoder(&data).Encode(si) - if err != nil { - log.Fatal("encode error:", err) - } - return data.Bytes() -} - -func DecodeStartIndication(data []byte) StartIndication { - var si StartIndication - err := gob.NewDecoder(bytes.NewReader(data)).Decode(&si) - if err != nil { - log.Fatal("decode error:", err) - } - return si -} diff --git a/relay/teste2c/turn.go b/relay/teste2c/turn.go deleted file mode 100644 index 355511d60..000000000 --- a/relay/teste2c/turn.go +++ /dev/null @@ -1,156 +0,0 @@ -package main - -import ( - "fmt" - "net" - "time" - - "github.com/pion/logging" - "github.com/pion/turn/v3" - log "github.com/sirupsen/logrus" -) - -type TurnConn struct { - conn net.Conn - turnClient *turn.Client - relayConn net.PacketConn -} - -func (tc *TurnConn) Address() net.Addr { - return tc.relayConn.LocalAddr() -} - -func (tc *TurnConn) Close() { - _ = tc.relayConn.Close() - tc.turnClient.Close() - _ = tc.conn.Close() -} - -func AllocateTurnClient(serverAddr string) *TurnConn { - conn, err := net.Dial("tcp", serverAddr) - if err != nil { - log.Fatal(err) - } - - turnClient, err := getTurnClient(serverAddr, conn) - if err != nil { - log.Fatal(err) - } - - relayConn, err := turnClient.Allocate() - if err != nil { - log.Fatal(err) - } - - return &TurnConn{ - conn: conn, - turnClient: turnClient, - relayConn: relayConn, - } -} - -func (tc *TurnConn) WriteTestData(testData []byte, dstAddr net.Addr) { - log.Infof("write test data to: %s", dstAddr) - testDataSize := len(testData) - si := NewStartInidication(time.Now(), testDataSize) - _, err := tc.relayConn.WriteTo(si, dstAddr) - if err != nil { - log.Errorf("failed to write to: %s, %s", dstAddr, err) - return - } - - pieceSize := 1024 - ackBuff := make([]byte, 1) - pipelineSize := 10 - for j := 0; j < testDataSize; j += pieceSize { - end := j + pieceSize - if end > testDataSize { - end = testDataSize - } - _, err := tc.relayConn.WriteTo(testData[j:end], dstAddr) - if err != nil { - log.Fatalf("failed to write to channel: %s", err) - } - if pipelineSize == 0 { - _, _, _ = tc.relayConn.ReadFrom(ackBuff) - } else { - pipelineSize-- - } - } -} - -func getTurnClient(address string, conn net.Conn) (*turn.Client, error) { - // Dial TURN Server - addrStr := fmt.Sprintf("%s:%d", address, 443) - - fac := logging.NewDefaultLoggerFactory() - //fac.DefaultLogLevel = logging.LogLevelTrace - - // Start a new TURN Client and wrap our net.Conn in a STUNConn - // This allows us to simulate datagram based communication over a net.Conn - cfg := &turn.ClientConfig{ - TURNServerAddr: address, - Conn: turn.NewSTUNConn(conn), - Username: "test", - Password: "test", - LoggerFactory: fac, - } - - client, err := turn.NewClient(cfg) - if err != nil { - return nil, fmt.Errorf("failed to create TURN client for server %s: %s", addrStr, err) - } - - // Start listening on the conn provided. - err = client.Listen() - if err != nil { - client.Close() - return nil, fmt.Errorf("failed to listen on TURN client for server %s: %s", addrStr, err) - } - - return client, nil -} - -type UDPConn struct { - net.Conn -} - -func Dial(addr string) (*UDPConn, error) { - conn, err := net.Dial("udp", addr) - if err != nil { - return nil, err - } - return &UDPConn{conn}, nil -} - -func (c UDPConn) ReadTestData(c2 *UDPConn) time.Duration { - log.Infof("reading test data from TURN relay") - var ( - tb int - ack = make([]byte, 1) - ) - buff := make([]byte, 8192) - n, err := c.Conn.Read(buff) - if err != nil { - log.Errorf("failed to read from channel: %s", err) - return 0 - } - - si := DecodeStartIndication(buff[:n]) - log.Infof("received start indication: %v", si) - - for { - n, e := c.Conn.Read(buff) - if e != nil { - return 0 - } - tb += n - _, _ = c.Conn.Write(ack) - - if tb >= si.TransferSize { - break - } - } - - return time.Since(si.Started) -}