From 7e422b1d76919fd970cab61f69330f1bf7eeaec7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Fri, 14 Mar 2025 14:20:55 +0100 Subject: [PATCH] fix --- client/internal/conn_mgr.go | 58 +++++++++------ client/internal/lazyconn/manager/manager.go | 82 ++++++++------------- client/internal/peer/conn.go | 1 + 3 files changed, 65 insertions(+), 76 deletions(-) diff --git a/client/internal/conn_mgr.go b/client/internal/conn_mgr.go index cea35d86d..2f036bfbe 100644 --- a/client/internal/conn_mgr.go +++ b/client/internal/conn_mgr.go @@ -29,6 +29,7 @@ type ConnMgr struct { connStateListener *peer.ConnectionListener + mu sync.Mutex wg sync.WaitGroup ctx context.Context ctxCancel context.CancelFunc @@ -57,13 +58,10 @@ func (e *ConnMgr) Start(parentCtx context.Context) { e.ctx = ctx e.ctxCancel = cancel - e.wg.Add(1) - go e.receiveLazyEvents(ctx) - e.wg.Add(1) go func() { defer e.wg.Done() - e.lazyConnMgr.Start(ctx) + e.lazyConnMgr.Start(ctx, e.onActive, e.onInactive) }() } @@ -72,12 +70,17 @@ func (e *ConnMgr) AddExcludeFromLazyConnection(peerID string) { } func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) { + e.mu.Lock() + defer e.mu.Unlock() + if success := e.peerStore.AddPeerConn(peerKey, conn); !success { return true } if !e.isStartedWithLazyMgr() { - conn.Open(e.ctx) + if err := conn.Open(e.ctx); err != nil { + conn.Log.Errorf("failed to open connection: %v", err) + } return } @@ -90,13 +93,17 @@ func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) { excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg) if err != nil { conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err) - conn.Open(e.ctx) + if err := conn.Open(e.ctx); err != nil { + conn.Log.Errorf("failed to open connection: %v", err) + } return } if excluded { conn.Log.Infof("peer is on lazy conn manager exclude list, opening connection") - conn.Open(e.ctx) + if err := conn.Open(e.ctx); err != nil { + conn.Log.Errorf("failed to open connection: %v", err) + } return } @@ -105,6 +112,9 @@ func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) { } func (e *ConnMgr) OnSignalMsg(peerKey string) (*peer.Conn, bool) { + e.mu.Lock() + defer e.mu.Unlock() + conn, ok := e.peerStore.PeerConn(peerKey) if !ok { return nil, false @@ -115,12 +125,17 @@ func (e *ConnMgr) OnSignalMsg(peerKey string) (*peer.Conn, bool) { } if found := e.lazyConnMgr.RunInactivityMonitor(peerKey); found { - conn.Open(e.ctx) + if err := conn.Open(e.ctx); err != nil { + conn.Log.Errorf("failed to open connection: %v", err) + } } return conn, true } func (e *ConnMgr) RemovePeerConn(peerKey string) { + e.mu.Lock() + defer e.mu.Unlock() + conn, ok := e.peerStore.Remove(peerKey) if !ok { return @@ -141,25 +156,22 @@ func (e *ConnMgr) Close() { } e.ctxCancel() - e.lazyConnMgr.Close() e.wg.Wait() e.lazyConnMgr = nil } -func (e *ConnMgr) receiveLazyEvents(ctx context.Context) { - defer e.wg.Done() - for { - select { - case <-ctx.Done(): - return - case peerID := <-e.lazyConnMgr.OnActive: - e.peerStore.PeerConnOpen(e.ctx, peerID) - case peerID := <-e.lazyConnMgr.Idle: - // todo consider to use engine lock - e.peerStore.PeerConnClose(peerID) - e.lazyConnMgr.RunActivityMonitor(peerID) - } - } +func (e *ConnMgr) onActive(peerID string) { + e.mu.Lock() + defer e.mu.Unlock() + + e.peerStore.PeerConnOpen(e.ctx, peerID) +} + +func (e *ConnMgr) onInactive(peerID string) { + e.mu.Lock() + defer e.mu.Unlock() + + e.peerStore.PeerConnClose(peerID) } func (e *ConnMgr) isStartedWithLazyMgr() bool { diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go index a2c27d0d0..f77676e3e 100644 --- a/client/internal/lazyconn/manager/manager.go +++ b/client/internal/lazyconn/manager/manager.go @@ -15,9 +15,6 @@ import ( // Manager manages lazy connections // This is not a thread safe implementation, do not call exported functions concurrently type Manager struct { - OnActive chan string - Idle chan string - connStateDispatcher *peer.ConnectionDispatcher connStateListener *peer.ConnectionListener @@ -35,9 +32,6 @@ type Manager struct { func NewManager(wgIface lazyconn.WGIface, connStateDispatcher *peer.ConnectionDispatcher) *Manager { m := &Manager{ - OnActive: make(chan string, 1), - Idle: make(chan string, 1), - connStateDispatcher: connStateDispatcher, managedPeers: make(map[string]*lazyconn.PeerConfig), managedPeersByConnID: make(map[peer.ConnID]*lazyconn.PeerConfig), @@ -57,6 +51,22 @@ func NewManager(wgIface lazyconn.WGIface, connStateDispatcher *peer.ConnectionDi return m } +func (m *Manager) Start(ctx context.Context, activeFn func(peerID string), inactiveFn func(peerID string)) { + defer m.close() + + ctx, m.cancel = context.WithCancel(ctx) + for { + select { + case <-ctx.Done(): + return + case e := <-m.activityManager.OnActivityChan: + m.onPeerActivity(ctx, e, activeFn) + case peerConnID := <-m.onInactive: + m.onPeerInactivityTimedOut(peerConnID, inactiveFn) + } + } +} + func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -127,26 +137,6 @@ func (m *Manager) RunInactivityMonitor(peerID string) (found bool) { return true } -func (m *Manager) RunActivityMonitor(peerID string) { - m.managedPeersMu.Lock() - defer m.managedPeersMu.Unlock() - - cfg, ok := m.managedPeers[peerID] - if !ok { - return - } - - cfg.Log.Infof("start activity monitor") - - // just in case free up - m.inactivityMonitors[cfg.PeerConnID].PauseTimer() - - if err := m.activityManager.MonitorPeerActivity(*cfg); err != nil { - cfg.Log.Errorf("failed to create activity monitor: %v", err) - return - } -} - func (m *Manager) ExcludePeer(peerID string) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -154,9 +144,7 @@ func (m *Manager) ExcludePeer(peerID string) { m.excludes[peerID] = struct{}{} } -// Close the manager and all the listeners -// block until all routine are done and cleanup the exported Channels -func (m *Manager) Close() { +func (m *Manager) close() { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -172,21 +160,7 @@ func (m *Manager) Close() { log.Infof("lazy connection manager closed") } -func (m *Manager) Start(ctx context.Context) { - ctx, m.cancel = context.WithCancel(ctx) - for { - select { - case <-ctx.Done(): - return - case e := <-m.activityManager.OnActivityChan: - m.onPeerActivity(ctx, e) - case peerConnID := <-m.onInactive: - m.onPeerInactivityTimedOut(ctx, peerConnID) - } - } -} - -func (m *Manager) onPeerActivity(ctx context.Context, e activity.OnAcitvityEvent) { +func (m *Manager) onPeerActivity(ctx context.Context, e activity.OnAcitvityEvent, onActiveListenerFn func(peerID string)) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -200,14 +174,10 @@ func (m *Manager) onPeerActivity(ctx context.Context, e activity.OnAcitvityEvent pcfg.Log.Infof("starting inactivity monitor") go m.inactivityMonitors[e.PeerConnId].Start(ctx, m.onInactive) - select { - case m.OnActive <- e.PeerID: - case <-ctx.Done(): - return - } + onActiveListenerFn(e.PeerID) } -func (m *Manager) onPeerInactivityTimedOut(ctx context.Context, peerConnID peer.ConnID) { +func (m *Manager) onPeerInactivityTimedOut(peerConnID peer.ConnID, onInactiveListenerFn func(peerID string)) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -222,12 +192,18 @@ func (m *Manager) onPeerInactivityTimedOut(ctx context.Context, peerConnID peer. return } - select { - case m.Idle <- pcfg.PublicKey: - case <-ctx.Done(): + onInactiveListenerFn(pcfg.PublicKey) + + pcfg.Log.Infof("start activity monitor") + + // just in case free up + m.inactivityMonitors[pcfg.PeerConnID].PauseTimer() + if err := m.activityManager.MonitorPeerActivity(*pcfg); err != nil { + pcfg.Log.Errorf("failed to create activity monitor: %v", err) return } } + func (m *Manager) onPeerConnected(conn *peer.Conn) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 7c1d12490..9f64d17cb 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -267,6 +267,7 @@ func (conn *Conn) Close() { conn.setStatusToDisconnected() conn.opened = false conn.wg.Wait() + conn.Log.Infof("peer connection closed") } // OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise