diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 29218c03a..d4a0dc5bf 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -80,7 +80,8 @@ type Conn struct { config ConnConfig statusRecorder *Status wgProxyFactory *wgproxy.Factory - wgProxy wgproxy.Proxy + wgProxyICE wgproxy.Proxy + wgProxyRelay wgproxy.Proxy signaler *Signaler allowedIPsIP string handshaker *Handshaker @@ -99,6 +100,8 @@ type Conn struct { afterRemovePeerHooks []AfterRemovePeerHookFunc currentConnType ConnPriority + + endpointRelay *net.UDPAddr } // NewConn creates a new not opened Conn to the remote peer. @@ -147,7 +150,6 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu // Open opens connection to the remote peer // It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will // be used. -// todo implement on disconnected event from ICE and relay too. func (conn *Conn) Open() { conn.log.Debugf("open connection to peer") @@ -176,12 +178,20 @@ func (conn *Conn) Close() { defer conn.mu.Unlock() conn.ctxCancel() - if conn.wgProxy != nil { - err := conn.wgProxy.CloseConn() + if conn.wgProxyRelay != nil { + err := conn.wgProxyRelay.CloseConn() if err != nil { - conn.log.Errorf("failed to close wg proxy: %v", err) + conn.log.Errorf("failed to close wg proxy for relay: %v", err) } - conn.wgProxy = nil + conn.wgProxyRelay = nil + } + + if conn.wgProxyICE != nil { + err := conn.wgProxyICE.CloseConn() + if err != nil { + conn.log.Errorf("failed to close wg proxy for ice: %v", err) + } + conn.wgProxyICE = nil } // todo: is it problem if we try to remove a peer what is never existed? @@ -277,6 +287,7 @@ func (conn *Conn) GetKey() string { func (conn *Conn) onWorkerICEStateChanged(newState ConnStatus) { conn.mu.Lock() defer conn.mu.Unlock() + log.Debugf("ICE connection state changed to %s", newState) defer func() { conn.statusICE = newState }() @@ -289,6 +300,16 @@ func (conn *Conn) onWorkerICEStateChanged(newState ConnStatus) { return } + if conn.endpointRelay != nil { + err := conn.configureWGEndpoint(conn.endpointRelay) + if err != nil { + conn.log.Errorf("failed to switch back to relay conn: %v", err) + } + // todo update status to relay related things + log.Debugf("switched back to relay connection") + return + } + if newState > conn.statusICE { peerState := State{ PubKey: conn.config.Key, @@ -307,6 +328,8 @@ func (conn *Conn) onWorkerRelayStateChanged(newState ConnStatus) { conn.statusRelay = newState }() + conn.log.Debugf("Relay connection state changed to %s", newState) + if conn.statusICE == StatusConnected { return } @@ -363,7 +386,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { } } - err = conn.config.WgConfig.WgInterface.UpdatePeer(conn.config.WgConfig.RemoteKey, conn.config.WgConfig.AllowedIps, defaultWgKeepAlive, endpointUdpAddr, conn.config.WgConfig.PreSharedKey) + err = conn.configureWGEndpoint(endpointUdpAddr) if err != nil { if err := wgProxy.CloseConn(); err != nil { conn.log.Warnf("Failed to close relay connection: %v", err) @@ -371,14 +394,14 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { conn.log.Errorf("Failed to update wg peer configuration: %v", err) return } + conn.endpointRelay = endpointUdpAddr - if conn.wgProxy != nil { - if err := conn.wgProxy.CloseConn(); err != nil { + if conn.wgProxyRelay != nil { + if err := conn.wgProxyRelay.CloseConn(); err != nil { conn.log.Warnf("failed to close depracated wg proxy conn: %v", err) } } - conn.wgProxy = wgProxy - + conn.wgProxyRelay = wgProxy conn.currentConnType = connPriorityRelay peerState := State{ @@ -408,6 +431,8 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon if conn.currentConnType != 0 { conn.log.Infof("update connection to ICE type") + } else { + conn.log.Infof("set ICE to active connection") } var ( @@ -417,7 +442,7 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon if iceConnInfo.RelayedOnLocal { conn.log.Debugf("setup ice turn connection") wgProxy = conn.wgProxyFactory.GetProxy(conn.ctx) - ep, err := conn.wgProxy.AddTurnConn(iceConnInfo.RemoteConn) + ep, err := conn.wgProxyICE.AddTurnConn(iceConnInfo.RemoteConn) if err != nil { conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err) return @@ -448,12 +473,12 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon return } - if conn.wgProxy != nil { - if err := conn.wgProxy.CloseConn(); err != nil { + if conn.wgProxyICE != nil { + if err := conn.wgProxyICE.CloseConn(); err != nil { conn.log.Warnf("failed to close depracated wg proxy conn: %v", err) } } - conn.wgProxy = wgProxy + conn.wgProxyICE = wgProxy conn.currentConnType = priority @@ -469,6 +494,15 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon conn.updateStatus(peerState, iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr) } +func (conn *Conn) configureWGEndpoint(addr *net.UDPAddr) error { + return conn.config.WgConfig.WgInterface.UpdatePeer( + conn.config.WgConfig.RemoteKey, + conn.config.WgConfig.AllowedIps, + defaultWgKeepAlive, + addr, + conn.config.WgConfig.PreSharedKey, + ) +} func (conn *Conn) updateStatus(peerState State, remoteRosenpassPubKey []byte, remoteRosenpassAddr string) { peerState.PubKey = conn.config.Key peerState.ConnStatus = StatusConnected diff --git a/client/internal/peer/conn_test.go b/client/internal/peer/conn_test.go index 7dbb06b5a..78ccf9724 100644 --- a/client/internal/peer/conn_test.go +++ b/client/internal/peer/conn_test.go @@ -2,15 +2,19 @@ package peer import ( "context" + "os" "sync" "testing" "time" "github.com/magiconair/properties/assert" + log "github.com/sirupsen/logrus" "github.com/netbirdio/netbird/client/internal/stdnet" "github.com/netbirdio/netbird/client/internal/wgproxy" "github.com/netbirdio/netbird/iface" + relayClient "github.com/netbirdio/netbird/relay/client" + "github.com/netbirdio/netbird/util" ) var connConf = ConnConfig{ @@ -23,6 +27,12 @@ var connConf = ConnConfig{ }, } +func TestMain(m *testing.M) { + _ = util.InitLog("trace", "console") + code := m.Run() + os.Exit(code) +} + func TestNewConn_interfaceFilter(t *testing.T) { ignore := []string{iface.WgInterfaceDefault, "tun0", "zt", "ZeroTier", "utun", "wg", "ts", "Tailscale", "tailscale"} @@ -158,3 +168,50 @@ func TestConn_Status(t *testing.T) { }) } } + +func TestConn_Switch(t *testing.T) { + ctx := context.Background() + + wgProxyFactory := wgproxy.NewFactory(ctx, connConf.LocalWgPort) + connConfAlice := ConnConfig{ + Key: "LLHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=", + LocalKey: "RRHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=", + Timeout: time.Second, + LocalWgPort: 51820, + ICEConfig: ICEConfig{ + InterfaceBlackList: nil, + }, + WgConfig: WgConfig{ + WgListenPort: 51820, + RemoteKey: "LLHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=", + AllowedIps: "172.16.254.0/16", + }, + } + relayManagerAlice := relayClient.NewManager(ctx, "127.0.0.1:1234", connConf.LocalKey) + connAlice, err := NewConn(ctx, connConfAlice, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, relayManagerAlice) + if err != nil { + log.Fatalf("failed to create conn: %v", err) + } + connAlice.Open() + + connConfbob := ConnConfig{ + Key: "RRHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=", + LocalKey: "LLHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=", + Timeout: time.Second, + LocalWgPort: 51820, + ICEConfig: ICEConfig{ + InterfaceBlackList: nil, + }, + WgConfig: WgConfig{ + WgListenPort: 51820, + RemoteKey: "RRHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=", + AllowedIps: "172.16.254.0/16", + }, + } + relayManagerBob := relayClient.NewManager(ctx, "127.0.0.1:1234", connConf.LocalKey) + connBob, err := NewConn(ctx, connConfbob, NewRecorder("https://mgm"), wgProxyFactory, nil, nil, relayManagerBob) + if err != nil { + log.Fatalf("failed to create conn: %v", err) + } + connBob.Open() +} diff --git a/client/internal/peer/handshaker.go b/client/internal/peer/handshaker.go index 8441fe64a..ae96c858f 100644 --- a/client/internal/peer/handshaker.go +++ b/client/internal/peer/handshaker.go @@ -87,6 +87,7 @@ func (h *Handshaker) Handshake(args HandshakeArgs) (*OfferAnswer, error) { h.mu.Lock() defer h.mu.Unlock() + h.log.Infof("start handshake with remote peer") h.handshakeArgs = args cachedOfferAnswer, ok := h.cachedHandshake() @@ -195,6 +196,7 @@ func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) { case remoteOfferAnswer := <-h.remoteAnswerCh: return &remoteOfferAnswer, nil case <-timeout.C: + h.log.Debugf("handshake timeout") return nil, NewConnectionTimeoutError(h.config.Key, h.config.Timeout) case <-h.ctx.Done(): // closed externally diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 601f88934..611bcd72d 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -113,6 +113,7 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, config // It is trying to reconnection in a loop until the context is canceled. // In case of success connection it will call the onICEConnReady callback. func (w *WorkerICE) SetupICEConnection(hasRelayOnLocally bool) { + time.Sleep(20 * time.Second) for { if !w.waitForReconnectTry() { return diff --git a/client/internal/wgproxy/factory_linux.go b/client/internal/wgproxy/factory_linux.go index 79f5cd548..ba1ef8c45 100644 --- a/client/internal/wgproxy/factory_linux.go +++ b/client/internal/wgproxy/factory_linux.go @@ -8,6 +8,17 @@ import ( func NewFactory(ctx context.Context, wgPort int) *Factory { f := &Factory{wgPort: wgPort} + // todo: put it back + /* + ebpfProxy := NewWGEBPFProxy(ctx, wgPort) + err := ebpfProxy.listen() + if err != nil { + log.Warnf("failed to initialize ebpf proxy, fallback to user space proxy: %s", err) + return f + } + f.ebpfProxy = ebpfProxy + + */ return f } diff --git a/relay/client/manager.go b/relay/client/manager.go index a57e3ce55..1810b8167 100644 --- a/relay/client/manager.go +++ b/relay/client/manager.go @@ -91,10 +91,10 @@ func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) { } if !foreign { - log.Debugf("open connection to permanent server: %s", peerKey) + log.Debugf("open peer connection via permanent server: %s", peerKey) return m.relayClient.OpenConn(peerKey) } else { - log.Debugf("open connection to foreign server: %s", serverAddress) + log.Debugf("open peer connection via foreign server: %s", serverAddress) return m.openConnVia(serverAddress, peerKey) } }