diff --git a/client/internal/lazyconn/inalt/manager.go b/client/internal/lazyconn/inalt/manager.go new file mode 100644 index 000000000..3cc3f6b5e --- /dev/null +++ b/client/internal/lazyconn/inalt/manager.go @@ -0,0 +1,250 @@ +package inalt + +import ( + "context" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/client/iface/configurer" +) + +// Responder: vmp2 +// - Receive handshake initiation: 148 bytes + extra 32 bytes, every 02:00 - 03:00 minutes +// - Receive keep alive: 32 bytes, every 25 sec +// Initiator: mp1 +// - Receive handshake response: +// - Receive keep alive: 32 bytes, every 25 sec + +const ( + keepAliveBytes = 32 + keepAliveInterval = 25 * time.Second + handshakeInitBytes = 148 + handshakeRespBytes = 92 + handshakeMaxInterval = 3 * time.Minute + + checkInterval = 5 * time.Second + idleThreshold = 3 + idleCheckDuration = 3 * time.Minute + + // More conservative thresholds accounting for timing variations + protocolOverheadBuffer = 1.5 // 50% buffer for timing variations and extra handshakes + userTrafficMinimum = 1024 // Minimum bytes to consider as actual user activity +) + +type WgInterface interface { + GetStats() (map[string]configurer.WGStats, error) +} + +type peerInfo struct { + lastRxBytesAtLastIdleCheck int64 // cumulative bytes at last 1-minute check + idleCount int + lastIdleCheckAt time.Time + + recentTrafficSamples []int64 + maxSamples int +} + +type Manager struct { + InactivePeersChan chan []string + iface WgInterface + interestedPeers map[string]*peerInfo + + // Dynamic thresholds based on expected patterns + maxProtocolTraffic int64 // Maximum expected for protocol-only traffic + minUserTraffic int64 // Minimum to indicate actual user activity +} + +func NewManager(iface WgInterface) *Manager { + // Calculate maximum expected protocol overhead per check period + numKeepAlives := int(idleCheckDuration / keepAliveInterval) + + // Worst case: multiple handshakes + all keep-alives + // In 3 minutes we might see 1-2 handshakes due to timing variations + maxHandshakes := 2 + maxProtocolBytes := int64(numKeepAlives*keepAliveBytes + maxHandshakes*(handshakeInitBytes+handshakeRespBytes)) + + // Apply buffer for timing variations and edge cases + maxProtocolWithBuffer := int64(float64(maxProtocolBytes) * protocolOverheadBuffer) + + // Set user traffic threshold significantly higher than protocol overhead + minUserBytes := max(userTrafficMinimum, maxProtocolWithBuffer*2) + + log.Infof("--- Protocol thresholds - Max protocol overhead: %d bytes, Min user traffic: %d bytes", + maxProtocolWithBuffer, minUserBytes) + + return &Manager{ + InactivePeersChan: make(chan []string, 1), + iface: iface, + interestedPeers: make(map[string]*peerInfo), + maxProtocolTraffic: maxProtocolWithBuffer, + minUserTraffic: minUserBytes, + } +} + +func (m *Manager) AddPeer(peer string) { + if _, exists := m.interestedPeers[peer]; !exists { + m.interestedPeers[peer] = &peerInfo{ + maxSamples: 5, // Keep last 5 traffic samples for trend analysis + } + } +} + +func (m *Manager) RemovePeer(peer string) { + delete(m.interestedPeers, peer) +} + +func (m *Manager) Start(ctx context.Context) { + ticker := newTicker(checkInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C(): + idlePeers, err := m.checkStats() + if err != nil { + continue + } + + if len(idlePeers) == 0 { + continue + } + select { + case m.InactivePeersChan <- idlePeers: + case <-ctx.Done(): + continue + default: + continue + } + } + } +} + +func (m *Manager) checkStats() ([]string, error) { + stats, err := m.iface.GetStats() + if err != nil { + return nil, err + } + + now := time.Now() + var idlePeers []string + + for peer, info := range m.interestedPeers { + stat, found := stats[peer] + if !found { + continue + } + + // First measurement: initialize + if info.lastIdleCheckAt.IsZero() { + info.lastIdleCheckAt = now + info.lastRxBytesAtLastIdleCheck = stat.RxBytes + continue + } + + minDuration := idleCheckDuration - (checkInterval / 2) + if now.Sub(info.lastIdleCheckAt) >= minDuration { + rxDelta := stat.RxBytes - info.lastRxBytesAtLastIdleCheck + info.lastRxBytesAtLastIdleCheck = stat.RxBytes + + // Store traffic sample for trend analysis + info.recentTrafficSamples = append(info.recentTrafficSamples, rxDelta) + if len(info.recentTrafficSamples) > info.maxSamples { + info.recentTrafficSamples = info.recentTrafficSamples[1:] + } + + log.Infof("--- RxBytes delta: %d, samples: %v", rxDelta, info.recentTrafficSamples) + + // Improved idle detection logic + isIdle := m.evaluateIdleState(peer, info, rxDelta) + + if isIdle { + info.idleCount++ + } else { + info.idleCount = 0 + } + + info.lastIdleCheckAt = now + + if info.idleCount >= idleThreshold { + idlePeers = append(idlePeers, peer) + info.idleCount = 0 // reset after detecting idle + log.Infof("--- detected as idle after %d consecutive checks", idleThreshold) + } + } + } + + return idlePeers, nil +} + +// evaluateIdleState determines if a peer is idle based on traffic patterns +func (m *Manager) evaluateIdleState(peer string, info *peerInfo, currentTraffic int64) bool { + // Clear case: significant user traffic detected + if currentTraffic >= m.minUserTraffic { + log.Infof("--- active - user traffic detected: %d >= %d bytes", currentTraffic, m.minUserTraffic) + return false + } + + // Traffic is within protocol overhead range - likely idle + if currentTraffic <= m.maxProtocolTraffic { + log.Infof("--- idle - only protocol traffic: %d <= %d bytes", currentTraffic, m.maxProtocolTraffic) + return true + } + + // Traffic is between protocol overhead and user traffic thresholds + // This is the ambiguous zone - use trend analysis if available + if len(info.recentTrafficSamples) >= 3 { + avgRecent := m.calculateAverage(info.recentTrafficSamples) + maxRecent := m.findMaximum(info.recentTrafficSamples) + + // If recent average is consistently low and max is also low, likely idle + if avgRecent <= float64(m.maxProtocolTraffic) && maxRecent <= m.maxProtocolTraffic { + log.Infof("--- trending idle - avg: %.2f, max: %d, both <= %d bytes", avgRecent, maxRecent, m.maxProtocolTraffic) + return true + } + + // If we've seen user-level traffic recently, consider active + if maxRecent >= m.minUserTraffic { + log.Infof("--- %s recently active - max recent traffic: %d >= %d bytes", maxRecent, m.minUserTraffic) + return false + } + } + + // In ambiguous cases with insufficient data, be conservative + // Slight preference for idle since this traffic level suggests minimal activity + log.Infof("--- %s ambiguous traffic %d bytes - assuming idle (between %d and %d)", currentTraffic, m.maxProtocolTraffic, m.minUserTraffic) + return true +} + +func (m *Manager) calculateAverage(samples []int64) float64 { + if len(samples) == 0 { + return 0 + } + var sum int64 + for _, sample := range samples { + sum += sample + } + return float64(sum) / float64(len(samples)) +} + +func (m *Manager) findMaximum(samples []int64) int64 { + if len(samples) == 0 { + return 0 + } + maxVal := samples[0] + for _, sample := range samples[1:] { + if sample > maxVal { + maxVal = sample + } + } + return maxVal +} + +func max(a, b int64) int64 { + if a > b { + return a + } + return b +} diff --git a/client/internal/lazyconn/inalt/manager_test.go b/client/internal/lazyconn/inalt/manager_test.go new file mode 100644 index 000000000..c5bb8b763 --- /dev/null +++ b/client/internal/lazyconn/inalt/manager_test.go @@ -0,0 +1,17 @@ +package inalt + +import ( + "testing" + "time" +) + +func init() { + // Override the ticker factory for testing + newTicker = func(d time.Duration) Ticker { + return newFakeTicker(d) + } +} + +func TestNewManager(t *testing.T) { + +} diff --git a/client/internal/lazyconn/inalt/ticker.go b/client/internal/lazyconn/inalt/ticker.go new file mode 100644 index 000000000..e3a4e4b67 --- /dev/null +++ b/client/internal/lazyconn/inalt/ticker.go @@ -0,0 +1,24 @@ +package inalt + +import "time" + +var newTicker = func(d time.Duration) Ticker { + return &realTicker{t: time.NewTicker(d)} +} + +type Ticker interface { + C() <-chan time.Time + Stop() +} + +type realTicker struct { + t *time.Ticker +} + +func (r *realTicker) C() <-chan time.Time { + return r.t.C +} + +func (r *realTicker) Stop() { + r.t.Stop() +} diff --git a/client/internal/lazyconn/inalt/ticker_moc_test.go b/client/internal/lazyconn/inalt/ticker_moc_test.go new file mode 100644 index 000000000..2c0625a6f --- /dev/null +++ b/client/internal/lazyconn/inalt/ticker_moc_test.go @@ -0,0 +1,34 @@ +package inalt + +import ( + "time" +) + +// fakeTicker is a controllable ticker for use in tests +type fakeTicker struct { + ch chan time.Time + now time.Time + interval time.Duration +} + +func newFakeTicker(d time.Duration) *fakeTicker { + return &fakeTicker{ + ch: make(chan time.Time, 1), + now: time.Now(), + interval: d, + } +} + +// C returns the channel to receive "ticks" — does not push values itself +func (f *fakeTicker) C() <-chan time.Time { + return f.ch +} + +// Tick simulates advancing time and sending a tick +func (f *fakeTicker) Tick() { + f.now = f.now.Add(f.interval) // use your desired interval + f.ch <- f.now +} + +// Stop is a no-op for fakeTicker +func (f *fakeTicker) Stop() {} diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go index 718bdbddf..47a22609d 100644 --- a/client/internal/lazyconn/manager/manager.go +++ b/client/internal/lazyconn/manager/manager.go @@ -11,6 +11,7 @@ import ( "github.com/netbirdio/netbird/client/internal/lazyconn" "github.com/netbirdio/netbird/client/internal/lazyconn/activity" "github.com/netbirdio/netbird/client/internal/lazyconn/inactivity" + "github.com/netbirdio/netbird/client/internal/lazyconn/inalt" "github.com/netbirdio/netbird/client/internal/peer/dispatcher" peerid "github.com/netbirdio/netbird/client/internal/peer/id" "github.com/netbirdio/netbird/client/internal/peerstore" @@ -52,15 +53,13 @@ type Manager struct { excludes map[string]lazyconn.PeerConfig managedPeersMu sync.Mutex - activityManager *activity.Manager - inactivityMonitors map[peerid.ConnID]*inactivity.Monitor + activityManager *activity.Manager + inactivityManager *inalt.Manager // Route HA group management peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group routesMu sync.RWMutex // protects route mappings - - onInactive chan peerid.ConnID } // NewManager creates a new lazy connection manager @@ -76,10 +75,9 @@ func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.S managedPeersByConnID: make(map[peerid.ConnID]*managedPeer), excludes: make(map[string]lazyconn.PeerConfig), activityManager: activity.NewManager(wgIface), - inactivityMonitors: make(map[peerid.ConnID]*inactivity.Monitor), + inactivityManager: inalt.NewManager(wgIface), peerToHAGroups: make(map[string][]route.HAUniqueID), haGroupToPeers: make(map[route.HAUniqueID][]string), - onInactive: make(chan peerid.ConnID), } if config.InactivityThreshold != nil { @@ -139,14 +137,21 @@ func (m *Manager) UpdateRouteHAMap(haMap route.HAMap) { func (m *Manager) Start(ctx context.Context) { defer m.close() + go m.inactivityManager.Start(ctx) + for { select { case <-ctx.Done(): return case peerConnID := <-m.activityManager.OnActivityChan: m.onPeerActivity(ctx, peerConnID) - case peerConnID := <-m.onInactive: - m.onPeerInactivityTimedOut(peerConnID) + case _ = <-m.inactivityManager.InactivePeersChan: + /* + for _, peerID := range peerIDs { + m.onPeerInactivityTimedOut(peerID) + } + + */ } } } @@ -217,8 +222,7 @@ func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) { return false, err } - im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID, m.inactivityThreshold) - m.inactivityMonitors[peerCfg.PeerConnID] = im + m.inactivityManager.AddPeer(peerCfg.PublicKey) m.managedPeers[peerCfg.PublicKey] = &peerCfg m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{ @@ -301,14 +305,8 @@ func (m *Manager) activateSinglePeer(ctx context.Context, cfg *lazyconn.PeerConf m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID) - im, ok := m.inactivityMonitors[cfg.PeerConnID] - if !ok { - cfg.Log.Errorf("inactivity monitor not found for peer") - return false - } - - cfg.Log.Infof("starting inactivity monitor") - go im.Start(ctx, m.onInactive) + cfg.Log.Infof("starting inactivity monitor for peer: %s", cfg.PublicKey) + m.inactivityManager.AddPeer(cfg.PublicKey) return true } @@ -360,9 +358,6 @@ func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig return nil } - im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID, m.inactivityThreshold) - m.inactivityMonitors[peerCfg.PeerConnID] = im - m.managedPeers[peerCfg.PublicKey] = &peerCfg m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{ peerCfg: &peerCfg, @@ -370,7 +365,7 @@ func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig } peerCfg.Log.Infof("starting inactivity monitor on peer that has been removed from exclude list") - go im.Start(ctx, m.onInactive) + m.inactivityManager.AddPeer(peerCfg.PublicKey) return nil } @@ -382,12 +377,7 @@ func (m *Manager) removePeer(peerID string) { cfg.Log.Infof("removing lazy peer") - if im, ok := m.inactivityMonitors[cfg.PeerConnID]; ok { - im.Stop() - delete(m.inactivityMonitors, cfg.PeerConnID) - cfg.Log.Debugf("inactivity monitor stopped") - } - + m.inactivityManager.RemovePeer(cfg.PublicKey) m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID) delete(m.managedPeers, peerID) delete(m.managedPeersByConnID, cfg.PeerConnID) @@ -399,10 +389,7 @@ func (m *Manager) close() { m.connStateDispatcher.RemoveListener(m.connStateListener) m.activityManager.Close() - for _, iw := range m.inactivityMonitors { - iw.Stop() - } - m.inactivityMonitors = make(map[peerid.ConnID]*inactivity.Monitor) + m.managedPeers = make(map[string]*lazyconn.PeerConfig) m.managedPeersByConnID = make(map[peerid.ConnID]*managedPeer) @@ -441,13 +428,19 @@ func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey) } -func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) { +func (m *Manager) onPeerInactivityTimedOut(peerID string) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() - mp, ok := m.managedPeersByConnID[peerConnID] + peerCfg, ok := m.managedPeers[peerID] if !ok { - log.Errorf("peer not found by id: %v", peerConnID) + log.Errorf("peer not found by peerId: %v", peerID) + return + } + + mp, ok := m.managedPeersByConnID[peerCfg.PeerConnID] + if !ok { + log.Errorf("peer not found by conn id: %v", peerCfg.PeerConnID) return } @@ -465,8 +458,7 @@ func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) { mp.expectedWatcher = watcherActivity - // just in case free up - m.inactivityMonitors[peerConnID].PauseTimer() + m.inactivityManager.RemovePeer(mp.peerCfg.PublicKey) if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil { mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err) @@ -487,14 +479,8 @@ func (m *Manager) onPeerConnected(peerConnID peerid.ConnID) { return } - iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID] - if !ok { - mp.peerCfg.Log.Errorf("inactivity monitor not found for peer") - return - } - mp.peerCfg.Log.Infof("peer connected, pausing inactivity monitor while connection is not disconnected") - iw.PauseTimer() + m.inactivityManager.AddPeer(mp.peerCfg.PublicKey) } func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) { @@ -510,11 +496,7 @@ func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) { return } - iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID] - if !ok { - return - } + // todo reset inactivity monitor mp.peerCfg.Log.Infof("reset inactivity monitor timer") - iw.ResetTimer() }