From e7098d7b7757fe86d7f286d830bff6e0fd56d568 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 14 Mar 2025 13:30:24 +0100 Subject: [PATCH] Block close call --- client/internal/conn_mgr.go | 12 +-- client/internal/engine.go | 2 +- client/internal/peer/conn.go | 134 +++++++++++++--------------- client/internal/peer/guard/guard.go | 25 ++---- client/internal/peer/handshaker.go | 12 ++- client/internal/peerstore/store.go | 5 +- 6 files changed, 85 insertions(+), 105 deletions(-) diff --git a/client/internal/conn_mgr.go b/client/internal/conn_mgr.go index 345064f37..cea35d86d 100644 --- a/client/internal/conn_mgr.go +++ b/client/internal/conn_mgr.go @@ -30,6 +30,7 @@ type ConnMgr struct { connStateListener *peer.ConnectionListener wg sync.WaitGroup + ctx context.Context ctxCancel context.CancelFunc } @@ -53,6 +54,7 @@ func (e *ConnMgr) Start(parentCtx context.Context) { } ctx, cancel := context.WithCancel(parentCtx) + e.ctx = ctx e.ctxCancel = cancel e.wg.Add(1) @@ -75,7 +77,7 @@ func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) { } if !e.isStartedWithLazyMgr() { - conn.Open() + conn.Open(e.ctx) return } @@ -88,13 +90,13 @@ func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) { excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg) if err != nil { conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err) - conn.Open() + conn.Open(e.ctx) return } if excluded { conn.Log.Infof("peer is on lazy conn manager exclude list, opening connection") - conn.Open() + conn.Open(e.ctx) return } @@ -113,7 +115,7 @@ func (e *ConnMgr) OnSignalMsg(peerKey string) (*peer.Conn, bool) { } if found := e.lazyConnMgr.RunInactivityMonitor(peerKey); found { - conn.Open() + conn.Open(e.ctx) } return conn, true } @@ -151,7 +153,7 @@ func (e *ConnMgr) receiveLazyEvents(ctx context.Context) { case <-ctx.Done(): return case peerID := <-e.lazyConnMgr.OnActive: - e.peerStore.PeerConnOpen(peerID) + e.peerStore.PeerConnOpen(e.ctx, peerID) case peerID := <-e.lazyConnMgr.Idle: // todo consider to use engine lock e.peerStore.PeerConnClose(peerID) diff --git a/client/internal/engine.go b/client/internal/engine.go index fc71e392b..9ce0d7997 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -1198,7 +1198,7 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix) (*peer }, } - peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher, e.connSemaphore, e.peerConnDispatcher) + peerConn, err := peer.NewConn(config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher, e.connSemaphore, e.peerConnDispatcher) if err != nil { return nil, err } diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 9a2f9a70a..7c1d12490 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -93,8 +93,9 @@ type Conn struct { config ConnConfig statusRecorder *Status signaler *Signaler + iFaceDiscover stdnet.ExternalIFaceDiscover relayManager *relayClient.Manager - handshaker *Handshaker + srWatcher *guard.SRWatcher onConnected func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string) onDisconnected func(remotePeer string) @@ -114,74 +115,81 @@ type Conn struct { wgProxyICE wgproxy.Proxy wgProxyRelay wgproxy.Proxy + handshaker *Handshaker guard *guard.Guard semaphore *semaphoregroup.SemaphoreGroup + wg sync.WaitGroup peerConnDispatcher *ConnectionDispatcher } // NewConn creates a new not opened Conn to the remote peer. // To establish a connection run Conn.Open -func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher, semaphore *semaphoregroup.SemaphoreGroup, peerConnDispatcher *ConnectionDispatcher) (*Conn, error) { +func NewConn(config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher, semaphore *semaphoregroup.SemaphoreGroup, peerConnDispatcher *ConnectionDispatcher) (*Conn, error) { if len(config.WgConfig.AllowedIps) == 0 { return nil, fmt.Errorf("allowed IPs is empty") } - ctx, ctxCancel := context.WithCancel(engineCtx) connLog := log.WithField("peer", config.Key) var conn = &Conn{ Log: connLog, - ctx: ctx, - ctxCancel: ctxCancel, config: config, statusRecorder: statusRecorder, signaler: signaler, + iFaceDiscover: iFaceDiscover, relayManager: relayManager, + srWatcher: srWatcher, statusRelay: NewAtomicConnStatus(), statusICE: NewAtomicConnStatus(), semaphore: semaphore, peerConnDispatcher: peerConnDispatcher, } - ctrl := isController(config) - conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager) - - relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally() - workerICE, err := NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally) - if err != nil { - return nil, err - } - conn.workerICE = workerICE - - conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay) - - conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer) - if os.Getenv("NB_FORCE_RELAY") != "true" { - conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer) - } - - conn.guard = guard.NewGuard(connLog, ctrl, conn.isConnectedOnAllWay, config.Timeout, srWatcher) - return conn, nil } // Open opens connection to the remote peer // It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will // be used. -func (conn *Conn) Open() { - conn.semaphore.Add(conn.ctx) +func (conn *Conn) Open(engineCtx context.Context) error { + conn.semaphore.Add(engineCtx) conn.mu.Lock() defer conn.mu.Unlock() if conn.opened { - conn.semaphore.Done(conn.ctx) - return + conn.semaphore.Done(engineCtx) + return nil } - conn.opened = true - go conn.handshaker.Listen() + conn.ctx, conn.ctxCancel = context.WithCancel(engineCtx) + + ctrl := isController(conn.config) + + conn.workerRelay = NewWorkerRelay(conn.Log, ctrl, conn.config, conn, conn.relayManager) + + relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally() + workerICE, err := NewWorkerICE(conn.ctx, conn.Log, conn.config, conn, conn.signaler, conn.iFaceDiscover, conn.statusRecorder, relayIsSupportedLocally) + if err != nil { + return err + } + conn.workerICE = workerICE + + conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay) + + conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer) + if os.Getenv("NB_FORCE_RELAY") != "true" { + conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer) + } + + conn.guard = guard.NewGuard(conn.Log, ctrl, conn.isConnectedOnAllWay, conn.config.Timeout, conn.srWatcher) + + conn.wg.Add(1) + go func() { + defer conn.wg.Done() + conn.handshaker.Listen(conn.ctx) + }() peerState := State{ PubKey: conn.config.Key, @@ -189,25 +197,28 @@ func (conn *Conn) Open() { ConnStatus: StatusConnecting, Mux: new(sync.RWMutex), } - err := conn.statusRecorder.UpdatePeerState(peerState) - if err != nil { + if err := conn.statusRecorder.UpdatePeerState(peerState); err != nil { conn.Log.Warnf("error while updating the state err: %v", err) } - go conn.startHandshakeAndReconnect(conn.ctx) -} + conn.wg.Add(1) + go func() { + defer conn.wg.Done() + conn.waitInitialRandomSleepTime(conn.ctx) + conn.semaphore.Done(conn.ctx) -func (conn *Conn) startHandshakeAndReconnect(ctx context.Context) { - defer conn.semaphore.Done(conn.ctx) - conn.waitInitialRandomSleepTime(ctx) + if err := conn.handshaker.sendOffer(); err != nil { + conn.Log.Errorf("failed to send initial offer: %v", err) + } - err := conn.handshaker.sendOffer() - if err != nil { - conn.Log.Errorf("failed to send initial offer: %v", err) - } - - go conn.guard.Start(ctx) - go conn.listenGuardEvent(ctx) + conn.wg.Add(1) + go func() { + conn.guard.Start(conn.ctx, conn.onGuardEvent) + conn.wg.Done() + }() + }() + conn.opened = true + return nil } // Close closes this peer Conn issuing a close event to the Conn closeCh @@ -255,6 +266,7 @@ func (conn *Conn) Close() { conn.setStatusToDisconnected() conn.opened = false + conn.wg.Wait() } // OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise @@ -316,10 +328,6 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC conn.mu.Lock() defer conn.mu.Unlock() - if conn.ctx.Err() != nil { - return - } - if remoteConnNil(conn.Log, iceConnInfo.RemoteConn) { conn.Log.Errorf("remote ICE connection is nil") return @@ -394,10 +402,6 @@ func (conn *Conn) onICEStateDisconnected() { conn.mu.Lock() defer conn.mu.Unlock() - if conn.ctx.Err() != nil { - return - } - conn.Log.Tracef("ICE connection state changed to disconnected") if conn.wgProxyICE != nil { @@ -445,13 +449,6 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) { conn.mu.Lock() defer conn.mu.Unlock() - if conn.ctx.Err() != nil { - if err := rci.relayedConn.Close(); err != nil { - conn.Log.Warnf("failed to close unnecessary relayed connection: %v", err) - } - return - } - conn.Log.Debugf("Relay connection has been established, setup the WireGuard") wgProxy, err := conn.newProxy(rci.relayedConn) @@ -498,10 +495,6 @@ func (conn *Conn) onRelayDisconnected() { conn.mu.Lock() defer conn.mu.Unlock() - if conn.ctx.Err() != nil { - return - } - conn.Log.Debugf("relay connection is disconnected") if conn.currentConnPriority == connPriorityRelay { @@ -535,17 +528,10 @@ func (conn *Conn) onRelayDisconnected() { } } -func (conn *Conn) listenGuardEvent(ctx context.Context) { - for { - select { - case <-conn.guard.Reconnect: - conn.Log.Debugf("send offer to peer") - if err := conn.handshaker.SendOffer(); err != nil { - conn.Log.Errorf("failed to send offer: %v", err) - } - case <-ctx.Done(): - return - } +func (conn *Conn) onGuardEvent() { + conn.Log.Debugf("send offer to peer") + if err := conn.handshaker.SendOffer(); err != nil { + conn.Log.Errorf("failed to send offer: %v", err) } } diff --git a/client/internal/peer/guard/guard.go b/client/internal/peer/guard/guard.go index 1fc2b4a4a..c97b2fc6d 100644 --- a/client/internal/peer/guard/guard.go +++ b/client/internal/peer/guard/guard.go @@ -46,11 +46,11 @@ func NewGuard(log *log.Entry, isController bool, isConnectedFn isConnectedFunc, } } -func (g *Guard) Start(ctx context.Context) { +func (g *Guard) Start(ctx context.Context, eventCallback func()) { if g.isController { - g.reconnectLoopWithRetry(ctx) + g.reconnectLoopWithRetry(ctx, eventCallback) } else { - g.listenForDisconnectEvents(ctx) + g.listenForDisconnectEvents(ctx, eventCallback) } } @@ -70,7 +70,7 @@ func (g *Guard) SetICEConnDisconnected() { // reconnectLoopWithRetry periodically check (max 30 min) the connection status. // Try to send offer while the P2P is not established or while the Relay is not connected if is it supported -func (g *Guard) reconnectLoopWithRetry(ctx context.Context) { +func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) { waitForInitialConnectionTry(ctx) srReconnectedChan := g.srWatcher.NewListener() @@ -93,7 +93,7 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context) { } if !g.isConnectedOnAllWay() { - g.triggerOfferSending() + callback() } case <-g.relayedConnDisconnected: @@ -125,7 +125,7 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context) { // when the connection is lost. It will try to establish a connection only once time if before the connection was established // It track separately the ice and relay connection status. Just because a lower priority connection reestablished it does not // mean that to switch to it. We always force to use the higher priority connection. -func (g *Guard) listenForDisconnectEvents(ctx context.Context) { +func (g *Guard) listenForDisconnectEvents(ctx context.Context, callback func()) { srReconnectedChan := g.srWatcher.NewListener() defer g.srWatcher.RemoveListener(srReconnectedChan) @@ -134,12 +134,12 @@ func (g *Guard) listenForDisconnectEvents(ctx context.Context) { select { case <-g.relayedConnDisconnected: g.log.Debugf("Relay connection changed, triggering reconnect") - g.triggerOfferSending() + callback() case <-g.iCEConnDisconnected: g.log.Debugf("ICE state changed, try to send new offer") - g.triggerOfferSending() + callback() case <-srReconnectedChan: - g.triggerOfferSending() + callback() case <-ctx.Done(): g.log.Debugf("context is done, stop reconnect loop") return @@ -164,13 +164,6 @@ func (g *Guard) prepareExponentTicker(ctx context.Context) *backoff.Ticker { return ticker } -func (g *Guard) triggerOfferSending() { - select { - case g.Reconnect <- struct{}{}: - default: - } -} - // Give chance to the peer to establish the initial connection. // With it, we can decrease to send necessary offer func waitForInitialConnectionTry(ctx context.Context) { diff --git a/client/internal/peer/handshaker.go b/client/internal/peer/handshaker.go index 545f81966..64f412bcb 100644 --- a/client/internal/peer/handshaker.go +++ b/client/internal/peer/handshaker.go @@ -43,7 +43,6 @@ type OfferAnswer struct { type Handshaker struct { mu sync.Mutex - ctx context.Context log *log.Entry config ConnConfig signaler *Signaler @@ -57,9 +56,8 @@ type Handshaker struct { remoteAnswerCh chan OfferAnswer } -func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker { +func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker { return &Handshaker{ - ctx: ctx, log: log, config: config, signaler: signaler, @@ -74,10 +72,10 @@ func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAn h.onNewOfferListeners = append(h.onNewOfferListeners, offer) } -func (h *Handshaker) Listen() { +func (h *Handshaker) Listen(ctx context.Context) { for { h.log.Debugf("wait for remote offer confirmation") - remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation() + remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation(ctx) if err != nil { var connectionClosedError *ConnectionClosedError if errors.As(err, &connectionClosedError) { @@ -127,7 +125,7 @@ func (h *Handshaker) OnRemoteAnswer(answer OfferAnswer) bool { } } -func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) { +func (h *Handshaker) waitForRemoteOfferConfirmation(ctx context.Context) (*OfferAnswer, error) { select { case remoteOfferAnswer := <-h.remoteOffersCh: // received confirmation from the remote peer -> ready to proceed @@ -138,7 +136,7 @@ func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) { return &remoteOfferAnswer, nil case remoteOfferAnswer := <-h.remoteAnswerCh: return &remoteOfferAnswer, nil - case <-h.ctx.Done(): + case <-ctx.Done(): // closed externally return nil, NewConnectionClosedError(h.config.Key) } diff --git a/client/internal/peerstore/store.go b/client/internal/peerstore/store.go index e1e37b907..3fffa380d 100644 --- a/client/internal/peerstore/store.go +++ b/client/internal/peerstore/store.go @@ -1,6 +1,7 @@ package peerstore import ( + "context" "net/netip" "sync" @@ -79,7 +80,7 @@ func (s *Store) PeerConn(pubKey string) (*peer.Conn, bool) { return p, true } -func (s *Store) PeerConnOpen(pubKey string) { +func (s *Store) PeerConnOpen(ctx context.Context, pubKey string) { s.peerConnsMu.RLock() defer s.peerConnsMu.RUnlock() @@ -88,7 +89,7 @@ func (s *Store) PeerConnOpen(pubKey string) { return } // this can be blocked because of the connect open limiter semaphore - p.Open() + p.Open(ctx) } func (s *Store) PeerConnClose(pubKey string) {