diff --git a/client/internal/conn_mgr.go b/client/internal/conn_mgr.go index 70e5e281c..0cb8d90c9 100644 --- a/client/internal/conn_mgr.go +++ b/client/internal/conn_mgr.go @@ -24,19 +24,19 @@ const ( // - Maintaining a list of excluded peers that should always have permanent connections // - Handling connection establishment based on peer signaling type ConnMgr struct { - lazyConnMgr *lazyConnManager.Manager peerStore *peerstore.Store + lazyConnMgr *lazyConnManager.Manager - excludes map[string]struct{} + connStateListener *peer.ConnectionListener wg sync.WaitGroup ctxCancel context.CancelFunc } -func NewConnMgr(peerStore *peerstore.Store, iface lazyconn.WGIface) *ConnMgr { +func NewConnMgr(peerStore *peerstore.Store, iface lazyconn.WGIface, dispatcher *peer.ConnectionDispatcher) *ConnMgr { var lazyConnMgr *lazyConnManager.Manager if os.Getenv(envDisableLazyConn) != "true" { - lazyConnMgr = lazyConnManager.NewManager(iface) + lazyConnMgr = lazyConnManager.NewManager(iface, dispatcher) } e := &ConnMgr{ @@ -56,14 +56,11 @@ func (e *ConnMgr) Start(parentCtx context.Context) { e.ctxCancel = cancel e.wg.Add(1) - go func() { - defer e.wg.Done() - e.receiveLazyConnEvents(ctx) - }() + go e.receiveLazyEvents(ctx) } func (e *ConnMgr) AddExcludeFromLazyConnection(peerID string) { - e.excludes[peerID] = struct{}{} + e.lazyConnMgr.ExcludePeer(peerID) } func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) { @@ -76,21 +73,24 @@ func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) { return } - _, exists = e.excludes[peerKey] - if exists { - conn.Open() - return - } - lazyPeerCfg := lazyconn.PeerConfig{ PublicKey: peerKey, AllowedIPs: conn.WgConfig().AllowedIps, PeerConnID: conn.ConnID(), } - if err := e.lazyConnMgr.AddPeer(lazyPeerCfg); err != nil { - log.Errorf("failed to add peer to lazyconn manager: %v", err) + excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg) + if err != nil { + conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err) conn.Open() + return } + + if excluded { + conn.Log.Infof("peer is on lazy conn manager exclude list, opening connection") + conn.Open() + return + } + conn.Log.Infof("peer added to lazy conn manager") return } @@ -138,14 +138,17 @@ func (e *ConnMgr) Close() { e.lazyConnMgr = nil } -func (e *ConnMgr) receiveLazyConnEvents(ctx context.Context) { +func (e *ConnMgr) receiveLazyEvents(ctx context.Context) { + defer e.wg.Done() for { - peerID, err := e.lazyConnMgr.NextEvent(ctx) - if err != nil { - log.Infof("lazy connection manager closed: %v", err) + select { + case <-ctx.Done(): return + case peerID := <-e.lazyConnMgr.OnDemand: + e.peerStore.PeerConnOpen(peerID) + case peerID := <-e.lazyConnMgr.Idle: + e.peerStore.PeerConnClose(peerID) } - e.peerStore.PeerConnOpen(peerID) } } diff --git a/client/internal/lazyconn/manager/idle.go b/client/internal/lazyconn/manager/idle.go new file mode 100644 index 000000000..e124acd49 --- /dev/null +++ b/client/internal/lazyconn/manager/idle.go @@ -0,0 +1,54 @@ +package manager + +import ( + "context" + "time" +) + +const ( + idleTimeout = 60 * time.Minute // went to idle after 1 hour inactivity +) + +type IdleWatch struct { + onIdle chan struct{} + timer *time.Timer +} + +func NewIdleWatch() *IdleWatch { + i := &IdleWatch{ + onIdle: make(chan struct{}, 1), + timer: time.NewTimer(0), + } + i.timer.Stop() + return i +} + +// call on open connection +func (i *IdleWatch) Start(ctx context.Context) { + i.timer.Reset(idleTimeout) + defer i.timer.Stop() + + select { + case <-i.timer.C: + select { + case i.Idle <- struct{}{}: + default: + } + case <-ctx.Done(): + return + } +} + +func (i *IdleWatch) Stop() { + // todo implement +} + +// call when connected +func (i *IdleWatch) HangUp() { + i.timer.Stop() +} + +// call when switch to None priority +func (i *IdleWatch) Reset() { + i.timer.Reset(idleTimeout) +} diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go index 68a282122..c645cce68 100644 --- a/client/internal/lazyconn/manager/manager.go +++ b/client/internal/lazyconn/manager/manager.go @@ -9,43 +9,69 @@ import ( "github.com/netbirdio/netbird/client/internal/lazyconn" "github.com/netbirdio/netbird/client/internal/lazyconn/listener" + "github.com/netbirdio/netbird/client/internal/peer" ) // Manager manages lazy connections // This is not a thread safe implementation, do not call exported functions concurrently type Manager struct { - listenerMgr *listener.Manager - managedPeers map[string]lazyconn.PeerConfig + OnDemand chan string + Idle chan string + + listenerMgr *listener.Manager + managedPeers map[string]lazyconn.PeerConfig + idleWatch map[string]*IdleWatch + + excludes map[string]struct{} managedPeersMu sync.Mutex closeChan chan struct{} } -func NewManager(wgIface lazyconn.WGIface) *Manager { +func NewManager(wgIface lazyconn.WGIface, connStateDispatcher *peer.ConnectionDispatcher) *Manager { m := &Manager{ + OnDemand: make(chan string, 1), + Idle: make(chan string, 1), + listenerMgr: listener.NewManager(wgIface), managedPeers: make(map[string]lazyconn.PeerConfig), + idleWatch: make(map[string]*IdleWatch), + excludes: make(map[string]struct{}), closeChan: make(chan struct{}), } + + connStateListener := &peer.ConnectionListener{ + OnConnected: m.onPeerConnected, + OnDisconnected: m.onPeerDisconnected, + } + + connStateDispatcher.AddListener(connStateListener) + e.connStateListener = connStateListener + return m } -func (m *Manager) AddPeer(peer lazyconn.PeerConfig) error { +func (m *Manager) AddPeer(peer lazyconn.PeerConfig) (bool, error) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() log.Debugf("adding lazy peer: %s", peer.PublicKey) + _, exists := m.excludes[peer.PublicKey] + if exists { + return true, nil + } + if _, ok := m.managedPeers[peer.PublicKey]; ok { log.Warnf("peer already managed: %s", peer.PublicKey) - return nil + return false, nil } if err := m.listenerMgr.CreatePeerListener(peer); err != nil { - return err + return false, err } m.managedPeers[peer.PublicKey] = peer - return nil + return false, nil } func (m *Manager) RemovePeer(peerID string) bool { @@ -58,11 +84,23 @@ func (m *Manager) RemovePeer(peerID string) bool { log.Debugf("removing lazy peer: %s", peerID) + if idleWatch, ok := m.idleWatch[peerID]; ok { + idleWatch.Stop() + delete(m.idleWatch, peerID) + } + m.listenerMgr.RemovePeer(peerID) delete(m.managedPeers, peerID) return true } +func (m *Manager) ExcludePeer(peerID string) { + m.managedPeersMu.Lock() + defer m.managedPeersMu.Unlock() + + m.excludes[peerID] = struct{}{} +} + // Close the manager and all the listeners // block until all routine are done and cleanup the exported Channels func (m *Manager) Close() { @@ -77,7 +115,7 @@ func (m *Manager) Close() { m.managedPeers = make(map[string]lazyconn.PeerConfig) } -func (m *Manager) NextEvent(ctx context.Context) (string, error) { +func (m *Manager) Start(ctx context.Context) (string, error) { for { select { case <-m.closeChan: @@ -86,7 +124,6 @@ func (m *Manager) NextEvent(ctx context.Context) (string, error) { return "", ctx.Err() case e := <-m.listenerMgr.TrafficStartChan: m.managedPeersMu.Lock() - // todo instead of peer ID check, check by the peer conn instance id pcfg, ok := m.managedPeers[e.PeerID] if !ok { m.managedPeersMu.Unlock() @@ -98,8 +135,77 @@ func (m *Manager) NextEvent(ctx context.Context) (string, error) { continue } + idleWatch := NewIdleWatch() + idleWatch.Start(ctx) + m.idleWatch[e.PeerID] = idleWatch + m.managedPeersMu.Unlock() return e.PeerID, nil } } } + +/* +func (m *Manager) NextOpenEvent(ctx context.Context) (string, error) { + for { + select { + case <-m.closeChan: + return "", fmt.Errorf("service closed") + case <-ctx.Done(): + return "", ctx.Err() + case e := <-m.listenerMgr.TrafficStartChan: + m.managedPeersMu.Lock() + pcfg, ok := m.managedPeers[e.PeerID] + if !ok { + m.managedPeersMu.Unlock() + continue + } + + if pcfg.PeerConnID != e.PeerConnId { + m.managedPeersMu.Unlock() + continue + } + + idleWatch := NewIdleWatch() + idleWatch.Start(ctx) + m.idleWatch[e.PeerID] = idleWatch + + m.managedPeersMu.Unlock() + return e.PeerID, nil + } + } +} + +*/ + +func (m *Manager) onPeerConnected(conn *peer.Conn) { + m.managedPeersMu.Lock() + defer m.managedPeersMu.Unlock() + + if _, ok := m.excludes[conn.GetKey()]; ok { + return + } + + iw, ok := m.idleWatch[conn.GetKey()] + if !ok { + conn.Log.Errorf("idle watch not found for peer") + } + + iw.HangUp() +} + +func (m *Manager) onPeerDisconnected(conn *peer.Conn) { + m.managedPeersMu.Lock() + defer m.managedPeersMu.Unlock() + + if _, ok := m.excludes[conn.GetKey()]; ok { + return + } + + iw, ok := m.idleWatch[conn.GetKey()] + if !ok { + conn.Log.Errorf("idle watch not found for peer") + } + + iw.Reset() +} diff --git a/client/internal/lazyconn/watcher/watcher.go b/client/internal/lazyconn/watcher/watcher.go deleted file mode 100644 index ef20f5cde..000000000 --- a/client/internal/lazyconn/watcher/watcher.go +++ /dev/null @@ -1,103 +0,0 @@ -package watcher - -import ( - "context" - "sync" - "time" - - log "github.com/sirupsen/logrus" - - "github.com/netbirdio/netbird/client/iface/configurer" - "github.com/netbirdio/netbird/client/internal/lazyconn" -) - -const ( - checkPeriod = 75 * time.Second // 3 * keep alive time (25s) - expectedMinimumRx = 90 * 2 // 2x keep alive packets -) - -type rxHistory struct { - received int64 -} - -// Watcher checks for peer timeouts -// Todo: this is a naive implementation, we must to finish it -type Watcher struct { - PeerTimedOutChan chan string - - wgIface lazyconn.WGIface - - peers map[string]*rxHistory - peersMu sync.Mutex -} - -func NewWatcher(wgIface lazyconn.WGIface) *Watcher { - return &Watcher{ - PeerTimedOutChan: make(chan string, 1), - wgIface: wgIface, - peers: make(map[string]*rxHistory), - } -} - -func (m *Watcher) Watch(ctx context.Context) { - timer := time.NewTimer(checkPeriod) - defer timer.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-timer.C: - stats, err := m.wgIface.GetStats() - if err != nil { - log.Errorf("failed to get peer stats: %s", err) - continue - } - m.checkTimeouts(ctx, stats) - } - } -} - -func (m *Watcher) AddPeer(peerID string) { - m.peersMu.Lock() - defer m.peersMu.Unlock() - - m.peers[peerID] = &rxHistory{} -} - -func (m *Watcher) RemovePeer(id string) { - m.peersMu.Lock() - defer m.peersMu.Unlock() - - delete(m.peers, id) -} - -func (m *Watcher) checkTimeouts(ctx context.Context, allPeersStats map[string]configurer.WGStats) { - m.peersMu.Lock() - defer m.peersMu.Unlock() - - for p, rxh := range m.peers { - s, ok := allPeersStats[p] - if !ok { - log.Warnf("no stats for peer %s", p) - } - - // received bytes since last check - received := s.RxBytes - rxh.received - if received >= expectedMinimumRx { - rxh.received = s.RxBytes - continue - } - - // todo handle that case when swtich from P2P to Relay and the endpoint has been reseted. - - // peer timed out - delete(m.peers, p) - - select { - case <-ctx.Done(): - return - case m.PeerTimedOutChan <- p: - } - } -} diff --git a/client/internal/peer/dispatcher.go b/client/internal/peer/dispatcher.go new file mode 100644 index 000000000..077d336d4 --- /dev/null +++ b/client/internal/peer/dispatcher.go @@ -0,0 +1,59 @@ +package peer + +import ( + "sync" +) + +/* + handler := peer.ConnectionListener{ + OnConnected: m.onPeerConnected, + OnDisconnected: m.onPeerDisconnected, + } + + dispatcher.AddListener(handler) +*/ + +type ConnectionListener struct { + OnConnected func(peer *Conn) + OnDisconnected func(peer *Conn) +} + +type ConnectionDispatcher struct { + listeners map[*ConnectionListener]struct{} + mu sync.Mutex +} + +func NewConnectionDispatcher() *ConnectionDispatcher { + return &ConnectionDispatcher{ + listeners: make(map[*ConnectionListener]struct{}), + } +} + +func (e *ConnectionDispatcher) AddListener(listener *ConnectionListener) { + e.mu.Lock() + defer e.mu.Unlock() + e.listeners[listener] = struct{}{} +} + +func (e *ConnectionDispatcher) RemoveListener(listener *ConnectionListener) { + e.mu.Lock() + defer e.mu.Unlock() + + delete(e.listeners, listener) +} + +func (e *ConnectionDispatcher) NotifyConnected(peer *Conn) { + e.mu.Lock() + defer e.mu.Unlock() + for listener, _ := range e.listeners { + listener.OnConnected(peer) + } +} + +func (e *ConnectionDispatcher) NotifyDisconnected(peer *Conn) { + e.mu.Lock() + defer e.mu.Unlock() + for listener, _ := range e.listeners { + listener.OnDisconnected(peer) + } +} diff --git a/client/internal/peerstore/store.go b/client/internal/peerstore/store.go index d1bab85e4..e1e37b907 100644 --- a/client/internal/peerstore/store.go +++ b/client/internal/peerstore/store.go @@ -91,6 +91,17 @@ func (s *Store) PeerConnOpen(pubKey string) { p.Open() } +func (s *Store) PeerConnClose(pubKey string) { + s.peerConnsMu.RLock() + defer s.peerConnsMu.RUnlock() + + p, ok := s.peerConns[pubKey] + if !ok { + return + } + p.Close() +} + func (s *Store) PeersPubKey() []string { s.peerConnsMu.RLock() defer s.peerConnsMu.RUnlock()