From f3a5e34c3fbd34c4a43f54282be4ae8c8780b0b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 18 Jun 2025 15:31:57 +0200 Subject: [PATCH] Change the logic and add moc data --- client/iface/bind/ice_bind.go | 1 + .../lazyconn/inactivity/inactivity.go | 70 ----- .../lazyconn/inactivity/inactivity_test.go | 156 ----------- .../internal/lazyconn/inactivity/manager.go | 174 ++++++++++++ .../lazyconn/inactivity/manager_test.go | 46 ++++ .../internal/lazyconn/inactivity/moc_test.go | 102 +++++++ .../lazyconn/inactivity/scenarios_test.go | 103 ++++++++ .../lazyconn/{inalt => inactivity}/ticker.go | 2 +- client/internal/lazyconn/inalt/manager.go | 250 ------------------ .../internal/lazyconn/inalt/manager_test.go | 17 -- .../lazyconn/inalt/ticker_moc_test.go | 34 --- client/internal/lazyconn/manager/manager.go | 34 ++- 12 files changed, 442 insertions(+), 547 deletions(-) delete mode 100644 client/internal/lazyconn/inactivity/inactivity.go delete mode 100644 client/internal/lazyconn/inactivity/inactivity_test.go create mode 100644 client/internal/lazyconn/inactivity/manager.go create mode 100644 client/internal/lazyconn/inactivity/manager_test.go create mode 100644 client/internal/lazyconn/inactivity/moc_test.go create mode 100644 client/internal/lazyconn/inactivity/scenarios_test.go rename client/internal/lazyconn/{inalt => inactivity}/ticker.go (94%) delete mode 100644 client/internal/lazyconn/inalt/manager.go delete mode 100644 client/internal/lazyconn/inalt/manager_test.go delete mode 100644 client/internal/lazyconn/inalt/ticker_moc_test.go diff --git a/client/iface/bind/ice_bind.go b/client/iface/bind/ice_bind.go index 66ec6a00d..5936d0dca 100644 --- a/client/iface/bind/ice_bind.go +++ b/client/iface/bind/ice_bind.go @@ -198,6 +198,7 @@ func (s *ICEBind) createIPv4ReceiverFn(pc *ipv4.PacketConn, conn *net.UDPConn, r if sizes[i] == 0 { continue } + log.Infof("--- received Datagram %s from %s, size: %d", msg.Addr, msg.Addr.String(), sizes[i]) addrPort := msg.Addr.(*net.UDPAddr).AddrPort() ep := &wgConn.StdNetEndpoint{AddrPort: addrPort} // TODO: remove allocation wgConn.GetSrcFromControl(msg.OOB[:msg.NN], ep) diff --git a/client/internal/lazyconn/inactivity/inactivity.go b/client/internal/lazyconn/inactivity/inactivity.go deleted file mode 100644 index a30c1846d..000000000 --- a/client/internal/lazyconn/inactivity/inactivity.go +++ /dev/null @@ -1,70 +0,0 @@ -package inactivity - -import ( - "context" - "time" - - peer "github.com/netbirdio/netbird/client/internal/peer/id" -) - -const ( - DefaultInactivityThreshold = 60 * time.Minute // idle after 1 hour inactivity - MinimumInactivityThreshold = 3 * time.Minute -) - -type Monitor struct { - id peer.ConnID - timer *time.Timer - cancel context.CancelFunc - inactivityThreshold time.Duration -} - -func NewInactivityMonitor(peerID peer.ConnID, threshold time.Duration) *Monitor { - i := &Monitor{ - id: peerID, - timer: time.NewTimer(0), - inactivityThreshold: threshold, - } - i.timer.Stop() - return i -} - -func (i *Monitor) Start(ctx context.Context, timeoutChan chan peer.ConnID) { - i.timer.Reset(i.inactivityThreshold) - defer i.timer.Stop() - - ctx, i.cancel = context.WithCancel(ctx) - defer func() { - defer i.cancel() - select { - case <-i.timer.C: - default: - } - }() - - select { - case <-i.timer.C: - select { - case timeoutChan <- i.id: - case <-ctx.Done(): - return - } - case <-ctx.Done(): - return - } -} - -func (i *Monitor) Stop() { - if i.cancel == nil { - return - } - i.cancel() -} - -func (i *Monitor) PauseTimer() { - i.timer.Stop() -} - -func (i *Monitor) ResetTimer() { - i.timer.Reset(i.inactivityThreshold) -} diff --git a/client/internal/lazyconn/inactivity/inactivity_test.go b/client/internal/lazyconn/inactivity/inactivity_test.go deleted file mode 100644 index 944512985..000000000 --- a/client/internal/lazyconn/inactivity/inactivity_test.go +++ /dev/null @@ -1,156 +0,0 @@ -package inactivity - -import ( - "context" - "testing" - "time" - - peerid "github.com/netbirdio/netbird/client/internal/peer/id" -) - -type MocPeer struct { -} - -func (m *MocPeer) ConnID() peerid.ConnID { - return peerid.ConnID(m) -} - -func TestInactivityMonitor(t *testing.T) { - tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5) - defer testTimeoutCancel() - - p := &MocPeer{} - im := NewInactivityMonitor(p.ConnID(), time.Second*2) - - timeoutChan := make(chan peerid.ConnID) - - exitChan := make(chan struct{}) - - go func() { - defer close(exitChan) - im.Start(tCtx, timeoutChan) - }() - - select { - case <-timeoutChan: - case <-tCtx.Done(): - t.Fatal("timeout") - } - - select { - case <-exitChan: - case <-tCtx.Done(): - t.Fatal("timeout") - } -} - -func TestReuseInactivityMonitor(t *testing.T) { - p := &MocPeer{} - im := NewInactivityMonitor(p.ConnID(), time.Second*2) - - timeoutChan := make(chan peerid.ConnID) - - for i := 2; i > 0; i-- { - exitChan := make(chan struct{}) - - testTimeoutCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5) - - go func() { - defer close(exitChan) - im.Start(testTimeoutCtx, timeoutChan) - }() - - select { - case <-timeoutChan: - case <-testTimeoutCtx.Done(): - t.Fatal("timeout") - } - - select { - case <-exitChan: - case <-testTimeoutCtx.Done(): - t.Fatal("timeout") - } - testTimeoutCancel() - } -} - -func TestStopInactivityMonitor(t *testing.T) { - tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5) - defer testTimeoutCancel() - - p := &MocPeer{} - im := NewInactivityMonitor(p.ConnID(), DefaultInactivityThreshold) - - timeoutChan := make(chan peerid.ConnID) - - exitChan := make(chan struct{}) - - go func() { - defer close(exitChan) - im.Start(tCtx, timeoutChan) - }() - - go func() { - time.Sleep(3 * time.Second) - im.Stop() - }() - - select { - case <-timeoutChan: - t.Fatal("unexpected timeout") - case <-exitChan: - case <-tCtx.Done(): - t.Fatal("timeout") - } -} - -func TestPauseInactivityMonitor(t *testing.T) { - tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*10) - defer testTimeoutCancel() - - p := &MocPeer{} - trashHold := time.Second * 3 - im := NewInactivityMonitor(p.ConnID(), trashHold) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - timeoutChan := make(chan peerid.ConnID) - - exitChan := make(chan struct{}) - - go func() { - defer close(exitChan) - im.Start(ctx, timeoutChan) - }() - - time.Sleep(1 * time.Second) // grant time to start the monitor - im.PauseTimer() - - // check to do not receive timeout - thresholdCtx, thresholdCancel := context.WithTimeout(context.Background(), trashHold+time.Second) - defer thresholdCancel() - select { - case <-exitChan: - t.Fatal("unexpected exit") - case <-timeoutChan: - t.Fatal("unexpected timeout") - case <-thresholdCtx.Done(): - // test ok - case <-tCtx.Done(): - t.Fatal("test timed out") - } - - // test reset timer - im.ResetTimer() - - select { - case <-tCtx.Done(): - t.Fatal("test timed out") - case <-exitChan: - t.Fatal("unexpected exit") - case <-timeoutChan: - // expected timeout - } -} diff --git a/client/internal/lazyconn/inactivity/manager.go b/client/internal/lazyconn/inactivity/manager.go new file mode 100644 index 000000000..f23725594 --- /dev/null +++ b/client/internal/lazyconn/inactivity/manager.go @@ -0,0 +1,174 @@ +package inactivity + +import ( + "context" + "time" + + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/client/iface/configurer" + "github.com/netbirdio/netbird/client/internal/lazyconn" +) + +// Responder: vmp2 +// - Receive handshake initiation: 148 bytes + extra 32 bytes, every 02:00 - 03:00 minutes +// - Receive keep alive: 32 bytes, every 25 sec +// Initiator: mp1 +// - Receive handshake response: +// - Receive keep alive: 32 bytes, every 25 sec + +const ( + keepAliveBytes = 32 + keepAliveInterval = 25 * time.Second + handshakeInitBytes = 148 + handshakeRespBytes = 92 + handshakeMaxInterval = 3 * time.Minute + + checkInterval = keepAliveInterval // todo: 5 * time.Second + keepAliveCheckPeriod = keepAliveInterval +) + +const ( + // todo make it configurable + DefaultInactivityThreshold = 60 * time.Minute // idle after 1 hour inactivity + MinimumInactivityThreshold = 3 * time.Minute +) + +type WgInterface interface { + GetStats() (map[string]configurer.WGStats, error) +} + +type peerInfo struct { + lastRxBytesAtLastIdleCheck int64 + lastIdleCheckAt time.Time + inActivityInRow int + log *log.Entry +} + +type Manager struct { + InactivePeersChan chan []string + iface WgInterface + interestedPeers map[string]*peerInfo +} + +func NewManager(iface WgInterface) *Manager { + return &Manager{ + InactivePeersChan: make(chan []string, 1), + iface: iface, + interestedPeers: make(map[string]*peerInfo), + } +} + +func (m *Manager) AddPeer(peerCfg *lazyconn.PeerConfig) { + if _, exists := m.interestedPeers[peerCfg.PublicKey]; !exists { + m.interestedPeers[peerCfg.PublicKey] = &peerInfo{ + log: peerCfg.Log, + } + } +} + +func (m *Manager) RemovePeer(peer string) { + delete(m.interestedPeers, peer) +} + +func (m *Manager) Start(ctx context.Context) { + ticker := newTicker(checkInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case tickTime := <-ticker.C(): + idlePeers, err := m.checkStats(tickTime) + if err != nil { + log.Errorf("error checking stats: %v", err) + return + } + + if len(idlePeers) == 0 { + continue + } + select { + case m.InactivePeersChan <- idlePeers: + case <-ctx.Done(): + continue + default: + continue + } + } + } +} + +func (m *Manager) checkStats(now time.Time) ([]string, error) { + stats, err := m.iface.GetStats() + if err != nil { + return nil, err + } + + var idlePeers []string + + for peer, info := range m.interestedPeers { + stat, found := stats[peer] + if !found { + info.log.Warnf("peer not found in wg stats") + continue + } + + // First measurement: initialize + if info.lastIdleCheckAt.IsZero() { + info.lastIdleCheckAt = now + info.lastRxBytesAtLastIdleCheck = stat.RxBytes + info.log.Infof("initializing RxBytes: %v, %v", now, stat.RxBytes) + continue + } + + // check only every idleCheckDuration + if shouldSkipIdleCheck(now, info.lastIdleCheckAt) { + continue + } + + // sometimes we measure false inactivity, so we need to check if we have activity in a row + inactive := isInactive(stat, info) + if inactive { + info.inActivityInRow++ + } else { + info.inActivityInRow = 0 + } + + if info.inActivityInRow >= 3 { + info.log.Infof("peer is inactive for %d checks, marking as inactive", info.inActivityInRow) + idlePeers = append(idlePeers, peer) + } + info.lastIdleCheckAt = now + info.lastRxBytesAtLastIdleCheck = stat.RxBytes + } + + return idlePeers, nil +} + +func isInactive(stat configurer.WGStats, info *peerInfo) bool { + rxSyncPrevPeriod := stat.RxBytes - info.lastRxBytesAtLastIdleCheck + switch rxSyncPrevPeriod { + case 0: + info.log.Tracef("peer inactive, received 0 bytes") + return true + case keepAliveBytes: + info.log.Tracef("peer inactive, only keep alive received, current RxBytes: %d", rxSyncPrevPeriod) + return true + case handshakeInitBytes + keepAliveBytes: + info.log.Tracef("peer inactive, only handshakeInitBytes + keepAliveBytes, current RxBytes: %d", rxSyncPrevPeriod) + return true + case handshakeRespBytes + keepAliveBytes: + info.log.Tracef("peer inactive, only handshakeRespBytes + keepAliveBytes, current RxBytes: %d", rxSyncPrevPeriod) + return true + default: + info.log.Infof("active, RxBytes: %d", rxSyncPrevPeriod) + return false + } +} + +func shouldSkipIdleCheck(now, lastIdleCheckAt time.Time) bool { + minDuration := keepAliveCheckPeriod - (checkInterval / 2) + return now.Sub(lastIdleCheckAt) < minDuration +} diff --git a/client/internal/lazyconn/inactivity/manager_test.go b/client/internal/lazyconn/inactivity/manager_test.go new file mode 100644 index 000000000..2e9783d21 --- /dev/null +++ b/client/internal/lazyconn/inactivity/manager_test.go @@ -0,0 +1,46 @@ +package inactivity + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/netbirdio/netbird/util" +) + +func init() { + _ = util.InitLog("trace", "console") +} + +func TestNewManager(t *testing.T) { + for i, sc := range scenarios { + timer := NewFakeTimer() + newTicker = func(d time.Duration) Ticker { + return newFakeTicker(d, timer) + } + + t.Run(fmt.Sprintf("Scenario %d", i), func(t *testing.T) { + mock := newMockWgInterface("peer1", sc.Data, timer) + manager := NewManager(mock) + manager.AddPeer("peer1") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + manager.Start(ctx) + + var inactiveResult bool + select { + case <-manager.InactivePeersChan: + inactiveResult = true + default: + inactiveResult = false + } + + if inactiveResult != sc.ExpectedInactive { + t.Errorf("Expected inactive peers: %v, got: %v", sc.ExpectedInactive, inactiveResult) + } + }) + } +} diff --git a/client/internal/lazyconn/inactivity/moc_test.go b/client/internal/lazyconn/inactivity/moc_test.go new file mode 100644 index 000000000..90826ffaa --- /dev/null +++ b/client/internal/lazyconn/inactivity/moc_test.go @@ -0,0 +1,102 @@ +package inactivity + +import ( + "fmt" + "time" + + "github.com/netbirdio/netbird/client/iface/configurer" +) + +type rxHistory struct { + when time.Duration + RxBytes int64 +} + +// mockWgInterface mocks WgInterface to simulate peer stats. +type mockWgInterface struct { + peerID string + statsSequence []rxHistory + timer *FakeTimer + initialTime time.Time + reachedLast bool +} + +func newMockWgInterface(peerID string, history []rxHistory, timer *FakeTimer) *mockWgInterface { + return &mockWgInterface{ + peerID: peerID, + statsSequence: history, + timer: timer, + initialTime: timer.Now(), + } +} + +func (m *mockWgInterface) GetStats() (map[string]configurer.WGStats, error) { + if m.reachedLast { + return nil, fmt.Errorf("no more data") + } + + now := m.timer.Now() + var rx int64 + for i, history := range m.statsSequence { + if now.Before(m.initialTime.Add(history.when)) { + break + } + + if len(m.statsSequence)-1 == i { + m.reachedLast = true + } + + rx += history.RxBytes + } + + wgStats := make(map[string]configurer.WGStats) + wgStats[m.peerID] = configurer.WGStats{ + RxBytes: rx, + } + return wgStats, nil +} + +// fakeTicker is a controllable ticker for use in tests +type fakeTicker struct { + interval time.Duration + timer *FakeTimer + + ch chan time.Time + now time.Time +} + +func newFakeTicker(interval time.Duration, timer *FakeTimer) *fakeTicker { + return &fakeTicker{ + interval: interval, + timer: timer, + ch: make(chan time.Time, 1), + now: timer.Now(), + } +} + +func (f *fakeTicker) C() <-chan time.Time { + f.now = f.now.Add(f.interval) + f.timer.Set(f.now) + f.ch <- f.now + return f.ch +} + +func (f *fakeTicker) Stop() {} + +type FakeTimer struct { + now time.Time +} + +func NewFakeTimer() *FakeTimer { + return &FakeTimer{ + now: time.Date(2025, time.June, 1, 0, 0, 0, 0, time.UTC), + } +} + +func (f *FakeTimer) Set(t time.Time) { + f.now = t +} + +func (f *FakeTimer) Now() time.Time { + return f.now +} diff --git a/client/internal/lazyconn/inactivity/scenarios_test.go b/client/internal/lazyconn/inactivity/scenarios_test.go new file mode 100644 index 000000000..b22a96663 --- /dev/null +++ b/client/internal/lazyconn/inactivity/scenarios_test.go @@ -0,0 +1,103 @@ +package inactivity + +import "time" + +type scenario struct { + ExpectedInactive bool + Data []rxHistory +} + +var scenarios = []scenario{ + { + ExpectedInactive: true, + Data: []rxHistory{ + {when: 0 * time.Second, RxBytes: 32}, + {when: 25 * time.Second, RxBytes: 32}, + {when: 50 * time.Second, RxBytes: 32}, + {when: 75 * time.Second, RxBytes: 32}, + {when: 100 * time.Second, RxBytes: 32}, + {when: 100 * time.Second, RxBytes: 92}, + {when: 150 * time.Second, RxBytes: 32}, + {when: 175 * time.Second, RxBytes: 32}, + {when: 200 * time.Second, RxBytes: 32}, + {when: 225 * time.Second, RxBytes: 32}, + {when: 250 * time.Second, RxBytes: 32}, + {when: 250 * time.Second, RxBytes: 92}, + {when: 300 * time.Second, RxBytes: 32}, + {when: 325 * time.Second, RxBytes: 32}, + {when: 350 * time.Second, RxBytes: 32}, + {when: 375 * time.Second, RxBytes: 32}, + {when: 375 * time.Second, RxBytes: 92}, + {when: 400 * time.Second, RxBytes: 32}, + {when: 425 * time.Second, RxBytes: 32}, + {when: 450 * time.Second, RxBytes: 32}, + {when: 475 * time.Second, RxBytes: 32}, + {when: 500 * time.Second, RxBytes: 32}, + {when: 500 * time.Second, RxBytes: 92}, + {when: 525 * time.Second, RxBytes: 32}, + {when: 550 * time.Second, RxBytes: 32}, + {when: 575 * time.Second, RxBytes: 32}, + {when: 600 * time.Second, RxBytes: 32}, + {when: 625 * time.Second, RxBytes: 32}, + {when: 625 * time.Second, RxBytes: 92}, + {when: 650 * time.Second, RxBytes: 32}, + {when: 675 * time.Second, RxBytes: 32}, + {when: 700 * time.Second, RxBytes: 32}, + {when: 725 * time.Second, RxBytes: 32}, + {when: 750 * time.Second, RxBytes: 32}, + {when: 750 * time.Second, RxBytes: 92}, + {when: 775 * time.Second, RxBytes: 32}, + }, + }, + { + ExpectedInactive: true, + Data: []rxHistory{ + {when: 0 * time.Second, RxBytes: 32}, + {when: 25 * time.Second, RxBytes: 32}, + {when: 50 * time.Second, RxBytes: 32}, + {when: 75 * time.Second, RxBytes: 32}, + {when: 100 * time.Second, RxBytes: 32}, + {when: 100 * time.Second, RxBytes: 148}, + {when: 125 * time.Second, RxBytes: 32}, + {when: 150 * time.Second, RxBytes: 32}, + {when: 175 * time.Second, RxBytes: 32}, + {when: 200 * time.Second, RxBytes: 32}, + {when: 225 * time.Second, RxBytes: 32}, + {when: 225 * time.Second, RxBytes: 148}, + {when: 250 * time.Second, RxBytes: 32}, + {when: 275 * time.Second, RxBytes: 32}, + {when: 300 * time.Second, RxBytes: 32}, + {when: 325 * time.Second, RxBytes: 32}, + {when: 350 * time.Second, RxBytes: 32}, + {when: 350 * time.Second, RxBytes: 148}, + {when: 375 * time.Second, RxBytes: 32}, + {when: 400 * time.Second, RxBytes: 32}, + {when: 425 * time.Second, RxBytes: 32}, + {when: 450 * time.Second, RxBytes: 32}, + {when: 475 * time.Second, RxBytes: 32}, + {when: 475 * time.Second, RxBytes: 148}, + {when: 500 * time.Second, RxBytes: 32}, + {when: 525 * time.Second, RxBytes: 32}, + {when: 550 * time.Second, RxBytes: 32}, + {when: 575 * time.Second, RxBytes: 32}, + {when: 600 * time.Second, RxBytes: 32}, + {when: 600 * time.Second, RxBytes: 148}, + {when: 625 * time.Second, RxBytes: 32}, + {when: 650 * time.Second, RxBytes: 32}, + {when: 675 * time.Second, RxBytes: 32}, + {when: 700 * time.Second, RxBytes: 32}, + {when: 725 * time.Second, RxBytes: 32}, + {when: 725 * time.Second, RxBytes: 148}, + {when: 750 * time.Second, RxBytes: 32}, + }, + }, + { + ExpectedInactive: false, + Data: []rxHistory{ + {when: 0 * time.Second, RxBytes: 32}, + {when: 25 * time.Second, RxBytes: 32}, + {when: 50 * time.Second, RxBytes: 100}, + {when: 75 * time.Second, RxBytes: 32}, + }, + }, +} diff --git a/client/internal/lazyconn/inalt/ticker.go b/client/internal/lazyconn/inactivity/ticker.go similarity index 94% rename from client/internal/lazyconn/inalt/ticker.go rename to client/internal/lazyconn/inactivity/ticker.go index e3a4e4b67..12b64bd5f 100644 --- a/client/internal/lazyconn/inalt/ticker.go +++ b/client/internal/lazyconn/inactivity/ticker.go @@ -1,4 +1,4 @@ -package inalt +package inactivity import "time" diff --git a/client/internal/lazyconn/inalt/manager.go b/client/internal/lazyconn/inalt/manager.go deleted file mode 100644 index 3cc3f6b5e..000000000 --- a/client/internal/lazyconn/inalt/manager.go +++ /dev/null @@ -1,250 +0,0 @@ -package inalt - -import ( - "context" - "time" - - log "github.com/sirupsen/logrus" - - "github.com/netbirdio/netbird/client/iface/configurer" -) - -// Responder: vmp2 -// - Receive handshake initiation: 148 bytes + extra 32 bytes, every 02:00 - 03:00 minutes -// - Receive keep alive: 32 bytes, every 25 sec -// Initiator: mp1 -// - Receive handshake response: -// - Receive keep alive: 32 bytes, every 25 sec - -const ( - keepAliveBytes = 32 - keepAliveInterval = 25 * time.Second - handshakeInitBytes = 148 - handshakeRespBytes = 92 - handshakeMaxInterval = 3 * time.Minute - - checkInterval = 5 * time.Second - idleThreshold = 3 - idleCheckDuration = 3 * time.Minute - - // More conservative thresholds accounting for timing variations - protocolOverheadBuffer = 1.5 // 50% buffer for timing variations and extra handshakes - userTrafficMinimum = 1024 // Minimum bytes to consider as actual user activity -) - -type WgInterface interface { - GetStats() (map[string]configurer.WGStats, error) -} - -type peerInfo struct { - lastRxBytesAtLastIdleCheck int64 // cumulative bytes at last 1-minute check - idleCount int - lastIdleCheckAt time.Time - - recentTrafficSamples []int64 - maxSamples int -} - -type Manager struct { - InactivePeersChan chan []string - iface WgInterface - interestedPeers map[string]*peerInfo - - // Dynamic thresholds based on expected patterns - maxProtocolTraffic int64 // Maximum expected for protocol-only traffic - minUserTraffic int64 // Minimum to indicate actual user activity -} - -func NewManager(iface WgInterface) *Manager { - // Calculate maximum expected protocol overhead per check period - numKeepAlives := int(idleCheckDuration / keepAliveInterval) - - // Worst case: multiple handshakes + all keep-alives - // In 3 minutes we might see 1-2 handshakes due to timing variations - maxHandshakes := 2 - maxProtocolBytes := int64(numKeepAlives*keepAliveBytes + maxHandshakes*(handshakeInitBytes+handshakeRespBytes)) - - // Apply buffer for timing variations and edge cases - maxProtocolWithBuffer := int64(float64(maxProtocolBytes) * protocolOverheadBuffer) - - // Set user traffic threshold significantly higher than protocol overhead - minUserBytes := max(userTrafficMinimum, maxProtocolWithBuffer*2) - - log.Infof("--- Protocol thresholds - Max protocol overhead: %d bytes, Min user traffic: %d bytes", - maxProtocolWithBuffer, minUserBytes) - - return &Manager{ - InactivePeersChan: make(chan []string, 1), - iface: iface, - interestedPeers: make(map[string]*peerInfo), - maxProtocolTraffic: maxProtocolWithBuffer, - minUserTraffic: minUserBytes, - } -} - -func (m *Manager) AddPeer(peer string) { - if _, exists := m.interestedPeers[peer]; !exists { - m.interestedPeers[peer] = &peerInfo{ - maxSamples: 5, // Keep last 5 traffic samples for trend analysis - } - } -} - -func (m *Manager) RemovePeer(peer string) { - delete(m.interestedPeers, peer) -} - -func (m *Manager) Start(ctx context.Context) { - ticker := newTicker(checkInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C(): - idlePeers, err := m.checkStats() - if err != nil { - continue - } - - if len(idlePeers) == 0 { - continue - } - select { - case m.InactivePeersChan <- idlePeers: - case <-ctx.Done(): - continue - default: - continue - } - } - } -} - -func (m *Manager) checkStats() ([]string, error) { - stats, err := m.iface.GetStats() - if err != nil { - return nil, err - } - - now := time.Now() - var idlePeers []string - - for peer, info := range m.interestedPeers { - stat, found := stats[peer] - if !found { - continue - } - - // First measurement: initialize - if info.lastIdleCheckAt.IsZero() { - info.lastIdleCheckAt = now - info.lastRxBytesAtLastIdleCheck = stat.RxBytes - continue - } - - minDuration := idleCheckDuration - (checkInterval / 2) - if now.Sub(info.lastIdleCheckAt) >= minDuration { - rxDelta := stat.RxBytes - info.lastRxBytesAtLastIdleCheck - info.lastRxBytesAtLastIdleCheck = stat.RxBytes - - // Store traffic sample for trend analysis - info.recentTrafficSamples = append(info.recentTrafficSamples, rxDelta) - if len(info.recentTrafficSamples) > info.maxSamples { - info.recentTrafficSamples = info.recentTrafficSamples[1:] - } - - log.Infof("--- RxBytes delta: %d, samples: %v", rxDelta, info.recentTrafficSamples) - - // Improved idle detection logic - isIdle := m.evaluateIdleState(peer, info, rxDelta) - - if isIdle { - info.idleCount++ - } else { - info.idleCount = 0 - } - - info.lastIdleCheckAt = now - - if info.idleCount >= idleThreshold { - idlePeers = append(idlePeers, peer) - info.idleCount = 0 // reset after detecting idle - log.Infof("--- detected as idle after %d consecutive checks", idleThreshold) - } - } - } - - return idlePeers, nil -} - -// evaluateIdleState determines if a peer is idle based on traffic patterns -func (m *Manager) evaluateIdleState(peer string, info *peerInfo, currentTraffic int64) bool { - // Clear case: significant user traffic detected - if currentTraffic >= m.minUserTraffic { - log.Infof("--- active - user traffic detected: %d >= %d bytes", currentTraffic, m.minUserTraffic) - return false - } - - // Traffic is within protocol overhead range - likely idle - if currentTraffic <= m.maxProtocolTraffic { - log.Infof("--- idle - only protocol traffic: %d <= %d bytes", currentTraffic, m.maxProtocolTraffic) - return true - } - - // Traffic is between protocol overhead and user traffic thresholds - // This is the ambiguous zone - use trend analysis if available - if len(info.recentTrafficSamples) >= 3 { - avgRecent := m.calculateAverage(info.recentTrafficSamples) - maxRecent := m.findMaximum(info.recentTrafficSamples) - - // If recent average is consistently low and max is also low, likely idle - if avgRecent <= float64(m.maxProtocolTraffic) && maxRecent <= m.maxProtocolTraffic { - log.Infof("--- trending idle - avg: %.2f, max: %d, both <= %d bytes", avgRecent, maxRecent, m.maxProtocolTraffic) - return true - } - - // If we've seen user-level traffic recently, consider active - if maxRecent >= m.minUserTraffic { - log.Infof("--- %s recently active - max recent traffic: %d >= %d bytes", maxRecent, m.minUserTraffic) - return false - } - } - - // In ambiguous cases with insufficient data, be conservative - // Slight preference for idle since this traffic level suggests minimal activity - log.Infof("--- %s ambiguous traffic %d bytes - assuming idle (between %d and %d)", currentTraffic, m.maxProtocolTraffic, m.minUserTraffic) - return true -} - -func (m *Manager) calculateAverage(samples []int64) float64 { - if len(samples) == 0 { - return 0 - } - var sum int64 - for _, sample := range samples { - sum += sample - } - return float64(sum) / float64(len(samples)) -} - -func (m *Manager) findMaximum(samples []int64) int64 { - if len(samples) == 0 { - return 0 - } - maxVal := samples[0] - for _, sample := range samples[1:] { - if sample > maxVal { - maxVal = sample - } - } - return maxVal -} - -func max(a, b int64) int64 { - if a > b { - return a - } - return b -} diff --git a/client/internal/lazyconn/inalt/manager_test.go b/client/internal/lazyconn/inalt/manager_test.go deleted file mode 100644 index c5bb8b763..000000000 --- a/client/internal/lazyconn/inalt/manager_test.go +++ /dev/null @@ -1,17 +0,0 @@ -package inalt - -import ( - "testing" - "time" -) - -func init() { - // Override the ticker factory for testing - newTicker = func(d time.Duration) Ticker { - return newFakeTicker(d) - } -} - -func TestNewManager(t *testing.T) { - -} diff --git a/client/internal/lazyconn/inalt/ticker_moc_test.go b/client/internal/lazyconn/inalt/ticker_moc_test.go deleted file mode 100644 index 2c0625a6f..000000000 --- a/client/internal/lazyconn/inalt/ticker_moc_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package inalt - -import ( - "time" -) - -// fakeTicker is a controllable ticker for use in tests -type fakeTicker struct { - ch chan time.Time - now time.Time - interval time.Duration -} - -func newFakeTicker(d time.Duration) *fakeTicker { - return &fakeTicker{ - ch: make(chan time.Time, 1), - now: time.Now(), - interval: d, - } -} - -// C returns the channel to receive "ticks" — does not push values itself -func (f *fakeTicker) C() <-chan time.Time { - return f.ch -} - -// Tick simulates advancing time and sending a tick -func (f *fakeTicker) Tick() { - f.now = f.now.Add(f.interval) // use your desired interval - f.ch <- f.now -} - -// Stop is a no-op for fakeTicker -func (f *fakeTicker) Stop() {} diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go index 47a22609d..52825f8f7 100644 --- a/client/internal/lazyconn/manager/manager.go +++ b/client/internal/lazyconn/manager/manager.go @@ -11,7 +11,6 @@ import ( "github.com/netbirdio/netbird/client/internal/lazyconn" "github.com/netbirdio/netbird/client/internal/lazyconn/activity" "github.com/netbirdio/netbird/client/internal/lazyconn/inactivity" - "github.com/netbirdio/netbird/client/internal/lazyconn/inalt" "github.com/netbirdio/netbird/client/internal/peer/dispatcher" peerid "github.com/netbirdio/netbird/client/internal/peer/id" "github.com/netbirdio/netbird/client/internal/peerstore" @@ -54,7 +53,7 @@ type Manager struct { managedPeersMu sync.Mutex activityManager *activity.Manager - inactivityManager *inalt.Manager + inactivityManager *inactivity.Manager // Route HA group management peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to @@ -75,7 +74,7 @@ func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.S managedPeersByConnID: make(map[peerid.ConnID]*managedPeer), excludes: make(map[string]lazyconn.PeerConfig), activityManager: activity.NewManager(wgIface), - inactivityManager: inalt.NewManager(wgIface), + inactivityManager: inactivity.NewManager(wgIface), peerToHAGroups: make(map[string][]route.HAUniqueID), haGroupToPeers: make(map[route.HAUniqueID][]string), } @@ -145,13 +144,10 @@ func (m *Manager) Start(ctx context.Context) { return case peerConnID := <-m.activityManager.OnActivityChan: m.onPeerActivity(ctx, peerConnID) - case _ = <-m.inactivityManager.InactivePeersChan: - /* - for _, peerID := range peerIDs { - m.onPeerInactivityTimedOut(peerID) - } - - */ + case peerIDs := <-m.inactivityManager.InactivePeersChan: + for _, peerID := range peerIDs { + m.onPeerInactivityTimedOut(peerID) + } } } } @@ -192,7 +188,7 @@ func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerCo peerCfg.Log.Infof("peer removed from lazy connection exclude list") - if err := m.addActivePeer(ctx, peerCfg); err != nil { + if err := m.addActivePeer(&peerCfg); err != nil { log.Errorf("failed to add peer to lazy connection manager: %s", err) continue } @@ -222,7 +218,7 @@ func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) { return false, err } - m.inactivityManager.AddPeer(peerCfg.PublicKey) + m.inactivityManager.AddPeer(&peerCfg) m.managedPeers[peerCfg.PublicKey] = &peerCfg m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{ @@ -244,7 +240,7 @@ func (m *Manager) AddActivePeers(ctx context.Context, peerCfg []lazyconn.PeerCon continue } - if err := m.addActivePeer(ctx, cfg); err != nil { + if err := m.addActivePeer(&cfg); err != nil { cfg.Log.Errorf("failed to add peer to lazy connection manager: %v", err) return err } @@ -306,7 +302,7 @@ func (m *Manager) activateSinglePeer(ctx context.Context, cfg *lazyconn.PeerConf m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID) cfg.Log.Infof("starting inactivity monitor for peer: %s", cfg.PublicKey) - m.inactivityManager.AddPeer(cfg.PublicKey) + m.inactivityManager.AddPeer(cfg) return true } @@ -352,20 +348,20 @@ func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string } } -func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig) error { +func (m *Manager) addActivePeer(peerCfg *lazyconn.PeerConfig) error { if _, ok := m.managedPeers[peerCfg.PublicKey]; ok { peerCfg.Log.Warnf("peer already managed") return nil } - m.managedPeers[peerCfg.PublicKey] = &peerCfg + m.managedPeers[peerCfg.PublicKey] = peerCfg m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{ - peerCfg: &peerCfg, + peerCfg: peerCfg, expectedWatcher: watcherInactivity, } peerCfg.Log.Infof("starting inactivity monitor on peer that has been removed from exclude list") - m.inactivityManager.AddPeer(peerCfg.PublicKey) + m.inactivityManager.AddPeer(peerCfg) return nil } @@ -480,7 +476,7 @@ func (m *Manager) onPeerConnected(peerConnID peerid.ConnID) { } mp.peerCfg.Log.Infof("peer connected, pausing inactivity monitor while connection is not disconnected") - m.inactivityManager.AddPeer(mp.peerCfg.PublicKey) + m.inactivityManager.AddPeer(mp.peerCfg) } func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) {