Add new algorithm

This commit is contained in:
Zoltan Papp
2025-06-14 02:25:05 +02:00
parent d24d8328f9
commit 93d8d272bf
5 changed files with 356 additions and 49 deletions

View File

@@ -0,0 +1,250 @@
package inalt
import (
"context"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/iface/configurer"
)
// Responder: vmp2
// - Receive handshake initiation: 148 bytes + extra 32 bytes, every 02:00 - 03:00 minutes
// - Receive keep alive: 32 bytes, every 25 sec
// Initiator: mp1
// - Receive handshake response:
// - Receive keep alive: 32 bytes, every 25 sec
const (
keepAliveBytes = 32
keepAliveInterval = 25 * time.Second
handshakeInitBytes = 148
handshakeRespBytes = 92
handshakeMaxInterval = 3 * time.Minute
checkInterval = 5 * time.Second
idleThreshold = 3
idleCheckDuration = 3 * time.Minute
// More conservative thresholds accounting for timing variations
protocolOverheadBuffer = 1.5 // 50% buffer for timing variations and extra handshakes
userTrafficMinimum = 1024 // Minimum bytes to consider as actual user activity
)
type WgInterface interface {
GetStats() (map[string]configurer.WGStats, error)
}
type peerInfo struct {
lastRxBytesAtLastIdleCheck int64 // cumulative bytes at last 1-minute check
idleCount int
lastIdleCheckAt time.Time
recentTrafficSamples []int64
maxSamples int
}
type Manager struct {
InactivePeersChan chan []string
iface WgInterface
interestedPeers map[string]*peerInfo
// Dynamic thresholds based on expected patterns
maxProtocolTraffic int64 // Maximum expected for protocol-only traffic
minUserTraffic int64 // Minimum to indicate actual user activity
}
func NewManager(iface WgInterface) *Manager {
// Calculate maximum expected protocol overhead per check period
numKeepAlives := int(idleCheckDuration / keepAliveInterval)
// Worst case: multiple handshakes + all keep-alives
// In 3 minutes we might see 1-2 handshakes due to timing variations
maxHandshakes := 2
maxProtocolBytes := int64(numKeepAlives*keepAliveBytes + maxHandshakes*(handshakeInitBytes+handshakeRespBytes))
// Apply buffer for timing variations and edge cases
maxProtocolWithBuffer := int64(float64(maxProtocolBytes) * protocolOverheadBuffer)
// Set user traffic threshold significantly higher than protocol overhead
minUserBytes := max(userTrafficMinimum, maxProtocolWithBuffer*2)
log.Infof("--- Protocol thresholds - Max protocol overhead: %d bytes, Min user traffic: %d bytes",
maxProtocolWithBuffer, minUserBytes)
return &Manager{
InactivePeersChan: make(chan []string, 1),
iface: iface,
interestedPeers: make(map[string]*peerInfo),
maxProtocolTraffic: maxProtocolWithBuffer,
minUserTraffic: minUserBytes,
}
}
func (m *Manager) AddPeer(peer string) {
if _, exists := m.interestedPeers[peer]; !exists {
m.interestedPeers[peer] = &peerInfo{
maxSamples: 5, // Keep last 5 traffic samples for trend analysis
}
}
}
func (m *Manager) RemovePeer(peer string) {
delete(m.interestedPeers, peer)
}
func (m *Manager) Start(ctx context.Context) {
ticker := newTicker(checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C():
idlePeers, err := m.checkStats()
if err != nil {
continue
}
if len(idlePeers) == 0 {
continue
}
select {
case m.InactivePeersChan <- idlePeers:
case <-ctx.Done():
continue
default:
continue
}
}
}
}
func (m *Manager) checkStats() ([]string, error) {
stats, err := m.iface.GetStats()
if err != nil {
return nil, err
}
now := time.Now()
var idlePeers []string
for peer, info := range m.interestedPeers {
stat, found := stats[peer]
if !found {
continue
}
// First measurement: initialize
if info.lastIdleCheckAt.IsZero() {
info.lastIdleCheckAt = now
info.lastRxBytesAtLastIdleCheck = stat.RxBytes
continue
}
minDuration := idleCheckDuration - (checkInterval / 2)
if now.Sub(info.lastIdleCheckAt) >= minDuration {
rxDelta := stat.RxBytes - info.lastRxBytesAtLastIdleCheck
info.lastRxBytesAtLastIdleCheck = stat.RxBytes
// Store traffic sample for trend analysis
info.recentTrafficSamples = append(info.recentTrafficSamples, rxDelta)
if len(info.recentTrafficSamples) > info.maxSamples {
info.recentTrafficSamples = info.recentTrafficSamples[1:]
}
log.Infof("--- RxBytes delta: %d, samples: %v", rxDelta, info.recentTrafficSamples)
// Improved idle detection logic
isIdle := m.evaluateIdleState(peer, info, rxDelta)
if isIdle {
info.idleCount++
} else {
info.idleCount = 0
}
info.lastIdleCheckAt = now
if info.idleCount >= idleThreshold {
idlePeers = append(idlePeers, peer)
info.idleCount = 0 // reset after detecting idle
log.Infof("--- detected as idle after %d consecutive checks", idleThreshold)
}
}
}
return idlePeers, nil
}
// evaluateIdleState determines if a peer is idle based on traffic patterns
func (m *Manager) evaluateIdleState(peer string, info *peerInfo, currentTraffic int64) bool {
// Clear case: significant user traffic detected
if currentTraffic >= m.minUserTraffic {
log.Infof("--- active - user traffic detected: %d >= %d bytes", currentTraffic, m.minUserTraffic)
return false
}
// Traffic is within protocol overhead range - likely idle
if currentTraffic <= m.maxProtocolTraffic {
log.Infof("--- idle - only protocol traffic: %d <= %d bytes", currentTraffic, m.maxProtocolTraffic)
return true
}
// Traffic is between protocol overhead and user traffic thresholds
// This is the ambiguous zone - use trend analysis if available
if len(info.recentTrafficSamples) >= 3 {
avgRecent := m.calculateAverage(info.recentTrafficSamples)
maxRecent := m.findMaximum(info.recentTrafficSamples)
// If recent average is consistently low and max is also low, likely idle
if avgRecent <= float64(m.maxProtocolTraffic) && maxRecent <= m.maxProtocolTraffic {
log.Infof("--- trending idle - avg: %.2f, max: %d, both <= %d bytes", avgRecent, maxRecent, m.maxProtocolTraffic)
return true
}
// If we've seen user-level traffic recently, consider active
if maxRecent >= m.minUserTraffic {
log.Infof("--- %s recently active - max recent traffic: %d >= %d bytes", maxRecent, m.minUserTraffic)
return false
}
}
// In ambiguous cases with insufficient data, be conservative
// Slight preference for idle since this traffic level suggests minimal activity
log.Infof("--- %s ambiguous traffic %d bytes - assuming idle (between %d and %d)", currentTraffic, m.maxProtocolTraffic, m.minUserTraffic)
return true
}
func (m *Manager) calculateAverage(samples []int64) float64 {
if len(samples) == 0 {
return 0
}
var sum int64
for _, sample := range samples {
sum += sample
}
return float64(sum) / float64(len(samples))
}
func (m *Manager) findMaximum(samples []int64) int64 {
if len(samples) == 0 {
return 0
}
maxVal := samples[0]
for _, sample := range samples[1:] {
if sample > maxVal {
maxVal = sample
}
}
return maxVal
}
func max(a, b int64) int64 {
if a > b {
return a
}
return b
}

View File

@@ -0,0 +1,17 @@
package inalt
import (
"testing"
"time"
)
func init() {
// Override the ticker factory for testing
newTicker = func(d time.Duration) Ticker {
return newFakeTicker(d)
}
}
func TestNewManager(t *testing.T) {
}

View File

@@ -0,0 +1,24 @@
package inalt
import "time"
var newTicker = func(d time.Duration) Ticker {
return &realTicker{t: time.NewTicker(d)}
}
type Ticker interface {
C() <-chan time.Time
Stop()
}
type realTicker struct {
t *time.Ticker
}
func (r *realTicker) C() <-chan time.Time {
return r.t.C
}
func (r *realTicker) Stop() {
r.t.Stop()
}

View File

@@ -0,0 +1,34 @@
package inalt
import (
"time"
)
// fakeTicker is a controllable ticker for use in tests
type fakeTicker struct {
ch chan time.Time
now time.Time
interval time.Duration
}
func newFakeTicker(d time.Duration) *fakeTicker {
return &fakeTicker{
ch: make(chan time.Time, 1),
now: time.Now(),
interval: d,
}
}
// C returns the channel to receive "ticks" — does not push values itself
func (f *fakeTicker) C() <-chan time.Time {
return f.ch
}
// Tick simulates advancing time and sending a tick
func (f *fakeTicker) Tick() {
f.now = f.now.Add(f.interval) // use your desired interval
f.ch <- f.now
}
// Stop is a no-op for fakeTicker
func (f *fakeTicker) Stop() {}

View File

@@ -11,6 +11,7 @@ import (
"github.com/netbirdio/netbird/client/internal/lazyconn" "github.com/netbirdio/netbird/client/internal/lazyconn"
"github.com/netbirdio/netbird/client/internal/lazyconn/activity" "github.com/netbirdio/netbird/client/internal/lazyconn/activity"
"github.com/netbirdio/netbird/client/internal/lazyconn/inactivity" "github.com/netbirdio/netbird/client/internal/lazyconn/inactivity"
"github.com/netbirdio/netbird/client/internal/lazyconn/inalt"
"github.com/netbirdio/netbird/client/internal/peer/dispatcher" "github.com/netbirdio/netbird/client/internal/peer/dispatcher"
peerid "github.com/netbirdio/netbird/client/internal/peer/id" peerid "github.com/netbirdio/netbird/client/internal/peer/id"
"github.com/netbirdio/netbird/client/internal/peerstore" "github.com/netbirdio/netbird/client/internal/peerstore"
@@ -52,15 +53,13 @@ type Manager struct {
excludes map[string]lazyconn.PeerConfig excludes map[string]lazyconn.PeerConfig
managedPeersMu sync.Mutex managedPeersMu sync.Mutex
activityManager *activity.Manager activityManager *activity.Manager
inactivityMonitors map[peerid.ConnID]*inactivity.Monitor inactivityManager *inalt.Manager
// Route HA group management // Route HA group management
peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to
haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group
routesMu sync.RWMutex // protects route mappings routesMu sync.RWMutex // protects route mappings
onInactive chan peerid.ConnID
} }
// NewManager creates a new lazy connection manager // NewManager creates a new lazy connection manager
@@ -76,10 +75,9 @@ func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.S
managedPeersByConnID: make(map[peerid.ConnID]*managedPeer), managedPeersByConnID: make(map[peerid.ConnID]*managedPeer),
excludes: make(map[string]lazyconn.PeerConfig), excludes: make(map[string]lazyconn.PeerConfig),
activityManager: activity.NewManager(wgIface), activityManager: activity.NewManager(wgIface),
inactivityMonitors: make(map[peerid.ConnID]*inactivity.Monitor), inactivityManager: inalt.NewManager(wgIface),
peerToHAGroups: make(map[string][]route.HAUniqueID), peerToHAGroups: make(map[string][]route.HAUniqueID),
haGroupToPeers: make(map[route.HAUniqueID][]string), haGroupToPeers: make(map[route.HAUniqueID][]string),
onInactive: make(chan peerid.ConnID),
} }
if config.InactivityThreshold != nil { if config.InactivityThreshold != nil {
@@ -139,14 +137,21 @@ func (m *Manager) UpdateRouteHAMap(haMap route.HAMap) {
func (m *Manager) Start(ctx context.Context) { func (m *Manager) Start(ctx context.Context) {
defer m.close() defer m.close()
go m.inactivityManager.Start(ctx)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case peerConnID := <-m.activityManager.OnActivityChan: case peerConnID := <-m.activityManager.OnActivityChan:
m.onPeerActivity(ctx, peerConnID) m.onPeerActivity(ctx, peerConnID)
case peerConnID := <-m.onInactive: case _ = <-m.inactivityManager.InactivePeersChan:
m.onPeerInactivityTimedOut(peerConnID) /*
for _, peerID := range peerIDs {
m.onPeerInactivityTimedOut(peerID)
}
*/
} }
} }
} }
@@ -217,8 +222,7 @@ func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
return false, err return false, err
} }
im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID, m.inactivityThreshold) m.inactivityManager.AddPeer(peerCfg.PublicKey)
m.inactivityMonitors[peerCfg.PeerConnID] = im
m.managedPeers[peerCfg.PublicKey] = &peerCfg m.managedPeers[peerCfg.PublicKey] = &peerCfg
m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{ m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{
@@ -301,14 +305,8 @@ func (m *Manager) activateSinglePeer(ctx context.Context, cfg *lazyconn.PeerConf
m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID) m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID)
im, ok := m.inactivityMonitors[cfg.PeerConnID] cfg.Log.Infof("starting inactivity monitor for peer: %s", cfg.PublicKey)
if !ok { m.inactivityManager.AddPeer(cfg.PublicKey)
cfg.Log.Errorf("inactivity monitor not found for peer")
return false
}
cfg.Log.Infof("starting inactivity monitor")
go im.Start(ctx, m.onInactive)
return true return true
} }
@@ -360,9 +358,6 @@ func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig
return nil return nil
} }
im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID, m.inactivityThreshold)
m.inactivityMonitors[peerCfg.PeerConnID] = im
m.managedPeers[peerCfg.PublicKey] = &peerCfg m.managedPeers[peerCfg.PublicKey] = &peerCfg
m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{ m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{
peerCfg: &peerCfg, peerCfg: &peerCfg,
@@ -370,7 +365,7 @@ func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig
} }
peerCfg.Log.Infof("starting inactivity monitor on peer that has been removed from exclude list") peerCfg.Log.Infof("starting inactivity monitor on peer that has been removed from exclude list")
go im.Start(ctx, m.onInactive) m.inactivityManager.AddPeer(peerCfg.PublicKey)
return nil return nil
} }
@@ -382,12 +377,7 @@ func (m *Manager) removePeer(peerID string) {
cfg.Log.Infof("removing lazy peer") cfg.Log.Infof("removing lazy peer")
if im, ok := m.inactivityMonitors[cfg.PeerConnID]; ok { m.inactivityManager.RemovePeer(cfg.PublicKey)
im.Stop()
delete(m.inactivityMonitors, cfg.PeerConnID)
cfg.Log.Debugf("inactivity monitor stopped")
}
m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID) m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID)
delete(m.managedPeers, peerID) delete(m.managedPeers, peerID)
delete(m.managedPeersByConnID, cfg.PeerConnID) delete(m.managedPeersByConnID, cfg.PeerConnID)
@@ -399,10 +389,7 @@ func (m *Manager) close() {
m.connStateDispatcher.RemoveListener(m.connStateListener) m.connStateDispatcher.RemoveListener(m.connStateListener)
m.activityManager.Close() m.activityManager.Close()
for _, iw := range m.inactivityMonitors {
iw.Stop()
}
m.inactivityMonitors = make(map[peerid.ConnID]*inactivity.Monitor)
m.managedPeers = make(map[string]*lazyconn.PeerConfig) m.managedPeers = make(map[string]*lazyconn.PeerConfig)
m.managedPeersByConnID = make(map[peerid.ConnID]*managedPeer) m.managedPeersByConnID = make(map[peerid.ConnID]*managedPeer)
@@ -441,13 +428,19 @@ func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID)
m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey) m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey)
} }
func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) { func (m *Manager) onPeerInactivityTimedOut(peerID string) {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
mp, ok := m.managedPeersByConnID[peerConnID] peerCfg, ok := m.managedPeers[peerID]
if !ok { if !ok {
log.Errorf("peer not found by id: %v", peerConnID) log.Errorf("peer not found by peerId: %v", peerID)
return
}
mp, ok := m.managedPeersByConnID[peerCfg.PeerConnID]
if !ok {
log.Errorf("peer not found by conn id: %v", peerCfg.PeerConnID)
return return
} }
@@ -465,8 +458,7 @@ func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) {
mp.expectedWatcher = watcherActivity mp.expectedWatcher = watcherActivity
// just in case free up m.inactivityManager.RemovePeer(mp.peerCfg.PublicKey)
m.inactivityMonitors[peerConnID].PauseTimer()
if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil { if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil {
mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err) mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err)
@@ -487,14 +479,8 @@ func (m *Manager) onPeerConnected(peerConnID peerid.ConnID) {
return return
} }
iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID]
if !ok {
mp.peerCfg.Log.Errorf("inactivity monitor not found for peer")
return
}
mp.peerCfg.Log.Infof("peer connected, pausing inactivity monitor while connection is not disconnected") mp.peerCfg.Log.Infof("peer connected, pausing inactivity monitor while connection is not disconnected")
iw.PauseTimer() m.inactivityManager.AddPeer(mp.peerCfg.PublicKey)
} }
func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) { func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) {
@@ -510,11 +496,7 @@ func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) {
return return
} }
iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID] // todo reset inactivity monitor
if !ok {
return
}
mp.peerCfg.Log.Infof("reset inactivity monitor timer") mp.peerCfg.Log.Infof("reset inactivity monitor timer")
iw.ResetTimer()
} }