Change the logic and add moc data

This commit is contained in:
Zoltán Papp
2025-06-18 15:31:57 +02:00
parent b3c0b46a88
commit f3a5e34c3f
12 changed files with 442 additions and 547 deletions

View File

@@ -1,70 +0,0 @@
package inactivity
import (
"context"
"time"
peer "github.com/netbirdio/netbird/client/internal/peer/id"
)
const (
DefaultInactivityThreshold = 60 * time.Minute // idle after 1 hour inactivity
MinimumInactivityThreshold = 3 * time.Minute
)
type Monitor struct {
id peer.ConnID
timer *time.Timer
cancel context.CancelFunc
inactivityThreshold time.Duration
}
func NewInactivityMonitor(peerID peer.ConnID, threshold time.Duration) *Monitor {
i := &Monitor{
id: peerID,
timer: time.NewTimer(0),
inactivityThreshold: threshold,
}
i.timer.Stop()
return i
}
func (i *Monitor) Start(ctx context.Context, timeoutChan chan peer.ConnID) {
i.timer.Reset(i.inactivityThreshold)
defer i.timer.Stop()
ctx, i.cancel = context.WithCancel(ctx)
defer func() {
defer i.cancel()
select {
case <-i.timer.C:
default:
}
}()
select {
case <-i.timer.C:
select {
case timeoutChan <- i.id:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
func (i *Monitor) Stop() {
if i.cancel == nil {
return
}
i.cancel()
}
func (i *Monitor) PauseTimer() {
i.timer.Stop()
}
func (i *Monitor) ResetTimer() {
i.timer.Reset(i.inactivityThreshold)
}

View File

@@ -1,156 +0,0 @@
package inactivity
import (
"context"
"testing"
"time"
peerid "github.com/netbirdio/netbird/client/internal/peer/id"
)
type MocPeer struct {
}
func (m *MocPeer) ConnID() peerid.ConnID {
return peerid.ConnID(m)
}
func TestInactivityMonitor(t *testing.T) {
tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5)
defer testTimeoutCancel()
p := &MocPeer{}
im := NewInactivityMonitor(p.ConnID(), time.Second*2)
timeoutChan := make(chan peerid.ConnID)
exitChan := make(chan struct{})
go func() {
defer close(exitChan)
im.Start(tCtx, timeoutChan)
}()
select {
case <-timeoutChan:
case <-tCtx.Done():
t.Fatal("timeout")
}
select {
case <-exitChan:
case <-tCtx.Done():
t.Fatal("timeout")
}
}
func TestReuseInactivityMonitor(t *testing.T) {
p := &MocPeer{}
im := NewInactivityMonitor(p.ConnID(), time.Second*2)
timeoutChan := make(chan peerid.ConnID)
for i := 2; i > 0; i-- {
exitChan := make(chan struct{})
testTimeoutCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5)
go func() {
defer close(exitChan)
im.Start(testTimeoutCtx, timeoutChan)
}()
select {
case <-timeoutChan:
case <-testTimeoutCtx.Done():
t.Fatal("timeout")
}
select {
case <-exitChan:
case <-testTimeoutCtx.Done():
t.Fatal("timeout")
}
testTimeoutCancel()
}
}
func TestStopInactivityMonitor(t *testing.T) {
tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5)
defer testTimeoutCancel()
p := &MocPeer{}
im := NewInactivityMonitor(p.ConnID(), DefaultInactivityThreshold)
timeoutChan := make(chan peerid.ConnID)
exitChan := make(chan struct{})
go func() {
defer close(exitChan)
im.Start(tCtx, timeoutChan)
}()
go func() {
time.Sleep(3 * time.Second)
im.Stop()
}()
select {
case <-timeoutChan:
t.Fatal("unexpected timeout")
case <-exitChan:
case <-tCtx.Done():
t.Fatal("timeout")
}
}
func TestPauseInactivityMonitor(t *testing.T) {
tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*10)
defer testTimeoutCancel()
p := &MocPeer{}
trashHold := time.Second * 3
im := NewInactivityMonitor(p.ConnID(), trashHold)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
timeoutChan := make(chan peerid.ConnID)
exitChan := make(chan struct{})
go func() {
defer close(exitChan)
im.Start(ctx, timeoutChan)
}()
time.Sleep(1 * time.Second) // grant time to start the monitor
im.PauseTimer()
// check to do not receive timeout
thresholdCtx, thresholdCancel := context.WithTimeout(context.Background(), trashHold+time.Second)
defer thresholdCancel()
select {
case <-exitChan:
t.Fatal("unexpected exit")
case <-timeoutChan:
t.Fatal("unexpected timeout")
case <-thresholdCtx.Done():
// test ok
case <-tCtx.Done():
t.Fatal("test timed out")
}
// test reset timer
im.ResetTimer()
select {
case <-tCtx.Done():
t.Fatal("test timed out")
case <-exitChan:
t.Fatal("unexpected exit")
case <-timeoutChan:
// expected timeout
}
}

View File

@@ -0,0 +1,174 @@
package inactivity
import (
"context"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/internal/lazyconn"
)
// Responder: vmp2
// - Receive handshake initiation: 148 bytes + extra 32 bytes, every 02:00 - 03:00 minutes
// - Receive keep alive: 32 bytes, every 25 sec
// Initiator: mp1
// - Receive handshake response:
// - Receive keep alive: 32 bytes, every 25 sec
const (
keepAliveBytes = 32
keepAliveInterval = 25 * time.Second
handshakeInitBytes = 148
handshakeRespBytes = 92
handshakeMaxInterval = 3 * time.Minute
checkInterval = keepAliveInterval // todo: 5 * time.Second
keepAliveCheckPeriod = keepAliveInterval
)
const (
// todo make it configurable
DefaultInactivityThreshold = 60 * time.Minute // idle after 1 hour inactivity
MinimumInactivityThreshold = 3 * time.Minute
)
type WgInterface interface {
GetStats() (map[string]configurer.WGStats, error)
}
type peerInfo struct {
lastRxBytesAtLastIdleCheck int64
lastIdleCheckAt time.Time
inActivityInRow int
log *log.Entry
}
type Manager struct {
InactivePeersChan chan []string
iface WgInterface
interestedPeers map[string]*peerInfo
}
func NewManager(iface WgInterface) *Manager {
return &Manager{
InactivePeersChan: make(chan []string, 1),
iface: iface,
interestedPeers: make(map[string]*peerInfo),
}
}
func (m *Manager) AddPeer(peerCfg *lazyconn.PeerConfig) {
if _, exists := m.interestedPeers[peerCfg.PublicKey]; !exists {
m.interestedPeers[peerCfg.PublicKey] = &peerInfo{
log: peerCfg.Log,
}
}
}
func (m *Manager) RemovePeer(peer string) {
delete(m.interestedPeers, peer)
}
func (m *Manager) Start(ctx context.Context) {
ticker := newTicker(checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case tickTime := <-ticker.C():
idlePeers, err := m.checkStats(tickTime)
if err != nil {
log.Errorf("error checking stats: %v", err)
return
}
if len(idlePeers) == 0 {
continue
}
select {
case m.InactivePeersChan <- idlePeers:
case <-ctx.Done():
continue
default:
continue
}
}
}
}
func (m *Manager) checkStats(now time.Time) ([]string, error) {
stats, err := m.iface.GetStats()
if err != nil {
return nil, err
}
var idlePeers []string
for peer, info := range m.interestedPeers {
stat, found := stats[peer]
if !found {
info.log.Warnf("peer not found in wg stats")
continue
}
// First measurement: initialize
if info.lastIdleCheckAt.IsZero() {
info.lastIdleCheckAt = now
info.lastRxBytesAtLastIdleCheck = stat.RxBytes
info.log.Infof("initializing RxBytes: %v, %v", now, stat.RxBytes)
continue
}
// check only every idleCheckDuration
if shouldSkipIdleCheck(now, info.lastIdleCheckAt) {
continue
}
// sometimes we measure false inactivity, so we need to check if we have activity in a row
inactive := isInactive(stat, info)
if inactive {
info.inActivityInRow++
} else {
info.inActivityInRow = 0
}
if info.inActivityInRow >= 3 {
info.log.Infof("peer is inactive for %d checks, marking as inactive", info.inActivityInRow)
idlePeers = append(idlePeers, peer)
}
info.lastIdleCheckAt = now
info.lastRxBytesAtLastIdleCheck = stat.RxBytes
}
return idlePeers, nil
}
func isInactive(stat configurer.WGStats, info *peerInfo) bool {
rxSyncPrevPeriod := stat.RxBytes - info.lastRxBytesAtLastIdleCheck
switch rxSyncPrevPeriod {
case 0:
info.log.Tracef("peer inactive, received 0 bytes")
return true
case keepAliveBytes:
info.log.Tracef("peer inactive, only keep alive received, current RxBytes: %d", rxSyncPrevPeriod)
return true
case handshakeInitBytes + keepAliveBytes:
info.log.Tracef("peer inactive, only handshakeInitBytes + keepAliveBytes, current RxBytes: %d", rxSyncPrevPeriod)
return true
case handshakeRespBytes + keepAliveBytes:
info.log.Tracef("peer inactive, only handshakeRespBytes + keepAliveBytes, current RxBytes: %d", rxSyncPrevPeriod)
return true
default:
info.log.Infof("active, RxBytes: %d", rxSyncPrevPeriod)
return false
}
}
func shouldSkipIdleCheck(now, lastIdleCheckAt time.Time) bool {
minDuration := keepAliveCheckPeriod - (checkInterval / 2)
return now.Sub(lastIdleCheckAt) < minDuration
}

View File

@@ -0,0 +1,46 @@
package inactivity
import (
"context"
"fmt"
"testing"
"time"
"github.com/netbirdio/netbird/util"
)
func init() {
_ = util.InitLog("trace", "console")
}
func TestNewManager(t *testing.T) {
for i, sc := range scenarios {
timer := NewFakeTimer()
newTicker = func(d time.Duration) Ticker {
return newFakeTicker(d, timer)
}
t.Run(fmt.Sprintf("Scenario %d", i), func(t *testing.T) {
mock := newMockWgInterface("peer1", sc.Data, timer)
manager := NewManager(mock)
manager.AddPeer("peer1")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager.Start(ctx)
var inactiveResult bool
select {
case <-manager.InactivePeersChan:
inactiveResult = true
default:
inactiveResult = false
}
if inactiveResult != sc.ExpectedInactive {
t.Errorf("Expected inactive peers: %v, got: %v", sc.ExpectedInactive, inactiveResult)
}
})
}
}

View File

@@ -0,0 +1,102 @@
package inactivity
import (
"fmt"
"time"
"github.com/netbirdio/netbird/client/iface/configurer"
)
type rxHistory struct {
when time.Duration
RxBytes int64
}
// mockWgInterface mocks WgInterface to simulate peer stats.
type mockWgInterface struct {
peerID string
statsSequence []rxHistory
timer *FakeTimer
initialTime time.Time
reachedLast bool
}
func newMockWgInterface(peerID string, history []rxHistory, timer *FakeTimer) *mockWgInterface {
return &mockWgInterface{
peerID: peerID,
statsSequence: history,
timer: timer,
initialTime: timer.Now(),
}
}
func (m *mockWgInterface) GetStats() (map[string]configurer.WGStats, error) {
if m.reachedLast {
return nil, fmt.Errorf("no more data")
}
now := m.timer.Now()
var rx int64
for i, history := range m.statsSequence {
if now.Before(m.initialTime.Add(history.when)) {
break
}
if len(m.statsSequence)-1 == i {
m.reachedLast = true
}
rx += history.RxBytes
}
wgStats := make(map[string]configurer.WGStats)
wgStats[m.peerID] = configurer.WGStats{
RxBytes: rx,
}
return wgStats, nil
}
// fakeTicker is a controllable ticker for use in tests
type fakeTicker struct {
interval time.Duration
timer *FakeTimer
ch chan time.Time
now time.Time
}
func newFakeTicker(interval time.Duration, timer *FakeTimer) *fakeTicker {
return &fakeTicker{
interval: interval,
timer: timer,
ch: make(chan time.Time, 1),
now: timer.Now(),
}
}
func (f *fakeTicker) C() <-chan time.Time {
f.now = f.now.Add(f.interval)
f.timer.Set(f.now)
f.ch <- f.now
return f.ch
}
func (f *fakeTicker) Stop() {}
type FakeTimer struct {
now time.Time
}
func NewFakeTimer() *FakeTimer {
return &FakeTimer{
now: time.Date(2025, time.June, 1, 0, 0, 0, 0, time.UTC),
}
}
func (f *FakeTimer) Set(t time.Time) {
f.now = t
}
func (f *FakeTimer) Now() time.Time {
return f.now
}

View File

@@ -0,0 +1,103 @@
package inactivity
import "time"
type scenario struct {
ExpectedInactive bool
Data []rxHistory
}
var scenarios = []scenario{
{
ExpectedInactive: true,
Data: []rxHistory{
{when: 0 * time.Second, RxBytes: 32},
{when: 25 * time.Second, RxBytes: 32},
{when: 50 * time.Second, RxBytes: 32},
{when: 75 * time.Second, RxBytes: 32},
{when: 100 * time.Second, RxBytes: 32},
{when: 100 * time.Second, RxBytes: 92},
{when: 150 * time.Second, RxBytes: 32},
{when: 175 * time.Second, RxBytes: 32},
{when: 200 * time.Second, RxBytes: 32},
{when: 225 * time.Second, RxBytes: 32},
{when: 250 * time.Second, RxBytes: 32},
{when: 250 * time.Second, RxBytes: 92},
{when: 300 * time.Second, RxBytes: 32},
{when: 325 * time.Second, RxBytes: 32},
{when: 350 * time.Second, RxBytes: 32},
{when: 375 * time.Second, RxBytes: 32},
{when: 375 * time.Second, RxBytes: 92},
{when: 400 * time.Second, RxBytes: 32},
{when: 425 * time.Second, RxBytes: 32},
{when: 450 * time.Second, RxBytes: 32},
{when: 475 * time.Second, RxBytes: 32},
{when: 500 * time.Second, RxBytes: 32},
{when: 500 * time.Second, RxBytes: 92},
{when: 525 * time.Second, RxBytes: 32},
{when: 550 * time.Second, RxBytes: 32},
{when: 575 * time.Second, RxBytes: 32},
{when: 600 * time.Second, RxBytes: 32},
{when: 625 * time.Second, RxBytes: 32},
{when: 625 * time.Second, RxBytes: 92},
{when: 650 * time.Second, RxBytes: 32},
{when: 675 * time.Second, RxBytes: 32},
{when: 700 * time.Second, RxBytes: 32},
{when: 725 * time.Second, RxBytes: 32},
{when: 750 * time.Second, RxBytes: 32},
{when: 750 * time.Second, RxBytes: 92},
{when: 775 * time.Second, RxBytes: 32},
},
},
{
ExpectedInactive: true,
Data: []rxHistory{
{when: 0 * time.Second, RxBytes: 32},
{when: 25 * time.Second, RxBytes: 32},
{when: 50 * time.Second, RxBytes: 32},
{when: 75 * time.Second, RxBytes: 32},
{when: 100 * time.Second, RxBytes: 32},
{when: 100 * time.Second, RxBytes: 148},
{when: 125 * time.Second, RxBytes: 32},
{when: 150 * time.Second, RxBytes: 32},
{when: 175 * time.Second, RxBytes: 32},
{when: 200 * time.Second, RxBytes: 32},
{when: 225 * time.Second, RxBytes: 32},
{when: 225 * time.Second, RxBytes: 148},
{when: 250 * time.Second, RxBytes: 32},
{when: 275 * time.Second, RxBytes: 32},
{when: 300 * time.Second, RxBytes: 32},
{when: 325 * time.Second, RxBytes: 32},
{when: 350 * time.Second, RxBytes: 32},
{when: 350 * time.Second, RxBytes: 148},
{when: 375 * time.Second, RxBytes: 32},
{when: 400 * time.Second, RxBytes: 32},
{when: 425 * time.Second, RxBytes: 32},
{when: 450 * time.Second, RxBytes: 32},
{when: 475 * time.Second, RxBytes: 32},
{when: 475 * time.Second, RxBytes: 148},
{when: 500 * time.Second, RxBytes: 32},
{when: 525 * time.Second, RxBytes: 32},
{when: 550 * time.Second, RxBytes: 32},
{when: 575 * time.Second, RxBytes: 32},
{when: 600 * time.Second, RxBytes: 32},
{when: 600 * time.Second, RxBytes: 148},
{when: 625 * time.Second, RxBytes: 32},
{when: 650 * time.Second, RxBytes: 32},
{when: 675 * time.Second, RxBytes: 32},
{when: 700 * time.Second, RxBytes: 32},
{when: 725 * time.Second, RxBytes: 32},
{when: 725 * time.Second, RxBytes: 148},
{when: 750 * time.Second, RxBytes: 32},
},
},
{
ExpectedInactive: false,
Data: []rxHistory{
{when: 0 * time.Second, RxBytes: 32},
{when: 25 * time.Second, RxBytes: 32},
{when: 50 * time.Second, RxBytes: 100},
{when: 75 * time.Second, RxBytes: 32},
},
},
}

View File

@@ -1,4 +1,4 @@
package inalt
package inactivity
import "time"

View File

@@ -1,250 +0,0 @@
package inalt
import (
"context"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/iface/configurer"
)
// Responder: vmp2
// - Receive handshake initiation: 148 bytes + extra 32 bytes, every 02:00 - 03:00 minutes
// - Receive keep alive: 32 bytes, every 25 sec
// Initiator: mp1
// - Receive handshake response:
// - Receive keep alive: 32 bytes, every 25 sec
const (
keepAliveBytes = 32
keepAliveInterval = 25 * time.Second
handshakeInitBytes = 148
handshakeRespBytes = 92
handshakeMaxInterval = 3 * time.Minute
checkInterval = 5 * time.Second
idleThreshold = 3
idleCheckDuration = 3 * time.Minute
// More conservative thresholds accounting for timing variations
protocolOverheadBuffer = 1.5 // 50% buffer for timing variations and extra handshakes
userTrafficMinimum = 1024 // Minimum bytes to consider as actual user activity
)
type WgInterface interface {
GetStats() (map[string]configurer.WGStats, error)
}
type peerInfo struct {
lastRxBytesAtLastIdleCheck int64 // cumulative bytes at last 1-minute check
idleCount int
lastIdleCheckAt time.Time
recentTrafficSamples []int64
maxSamples int
}
type Manager struct {
InactivePeersChan chan []string
iface WgInterface
interestedPeers map[string]*peerInfo
// Dynamic thresholds based on expected patterns
maxProtocolTraffic int64 // Maximum expected for protocol-only traffic
minUserTraffic int64 // Minimum to indicate actual user activity
}
func NewManager(iface WgInterface) *Manager {
// Calculate maximum expected protocol overhead per check period
numKeepAlives := int(idleCheckDuration / keepAliveInterval)
// Worst case: multiple handshakes + all keep-alives
// In 3 minutes we might see 1-2 handshakes due to timing variations
maxHandshakes := 2
maxProtocolBytes := int64(numKeepAlives*keepAliveBytes + maxHandshakes*(handshakeInitBytes+handshakeRespBytes))
// Apply buffer for timing variations and edge cases
maxProtocolWithBuffer := int64(float64(maxProtocolBytes) * protocolOverheadBuffer)
// Set user traffic threshold significantly higher than protocol overhead
minUserBytes := max(userTrafficMinimum, maxProtocolWithBuffer*2)
log.Infof("--- Protocol thresholds - Max protocol overhead: %d bytes, Min user traffic: %d bytes",
maxProtocolWithBuffer, minUserBytes)
return &Manager{
InactivePeersChan: make(chan []string, 1),
iface: iface,
interestedPeers: make(map[string]*peerInfo),
maxProtocolTraffic: maxProtocolWithBuffer,
minUserTraffic: minUserBytes,
}
}
func (m *Manager) AddPeer(peer string) {
if _, exists := m.interestedPeers[peer]; !exists {
m.interestedPeers[peer] = &peerInfo{
maxSamples: 5, // Keep last 5 traffic samples for trend analysis
}
}
}
func (m *Manager) RemovePeer(peer string) {
delete(m.interestedPeers, peer)
}
func (m *Manager) Start(ctx context.Context) {
ticker := newTicker(checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C():
idlePeers, err := m.checkStats()
if err != nil {
continue
}
if len(idlePeers) == 0 {
continue
}
select {
case m.InactivePeersChan <- idlePeers:
case <-ctx.Done():
continue
default:
continue
}
}
}
}
func (m *Manager) checkStats() ([]string, error) {
stats, err := m.iface.GetStats()
if err != nil {
return nil, err
}
now := time.Now()
var idlePeers []string
for peer, info := range m.interestedPeers {
stat, found := stats[peer]
if !found {
continue
}
// First measurement: initialize
if info.lastIdleCheckAt.IsZero() {
info.lastIdleCheckAt = now
info.lastRxBytesAtLastIdleCheck = stat.RxBytes
continue
}
minDuration := idleCheckDuration - (checkInterval / 2)
if now.Sub(info.lastIdleCheckAt) >= minDuration {
rxDelta := stat.RxBytes - info.lastRxBytesAtLastIdleCheck
info.lastRxBytesAtLastIdleCheck = stat.RxBytes
// Store traffic sample for trend analysis
info.recentTrafficSamples = append(info.recentTrafficSamples, rxDelta)
if len(info.recentTrafficSamples) > info.maxSamples {
info.recentTrafficSamples = info.recentTrafficSamples[1:]
}
log.Infof("--- RxBytes delta: %d, samples: %v", rxDelta, info.recentTrafficSamples)
// Improved idle detection logic
isIdle := m.evaluateIdleState(peer, info, rxDelta)
if isIdle {
info.idleCount++
} else {
info.idleCount = 0
}
info.lastIdleCheckAt = now
if info.idleCount >= idleThreshold {
idlePeers = append(idlePeers, peer)
info.idleCount = 0 // reset after detecting idle
log.Infof("--- detected as idle after %d consecutive checks", idleThreshold)
}
}
}
return idlePeers, nil
}
// evaluateIdleState determines if a peer is idle based on traffic patterns
func (m *Manager) evaluateIdleState(peer string, info *peerInfo, currentTraffic int64) bool {
// Clear case: significant user traffic detected
if currentTraffic >= m.minUserTraffic {
log.Infof("--- active - user traffic detected: %d >= %d bytes", currentTraffic, m.minUserTraffic)
return false
}
// Traffic is within protocol overhead range - likely idle
if currentTraffic <= m.maxProtocolTraffic {
log.Infof("--- idle - only protocol traffic: %d <= %d bytes", currentTraffic, m.maxProtocolTraffic)
return true
}
// Traffic is between protocol overhead and user traffic thresholds
// This is the ambiguous zone - use trend analysis if available
if len(info.recentTrafficSamples) >= 3 {
avgRecent := m.calculateAverage(info.recentTrafficSamples)
maxRecent := m.findMaximum(info.recentTrafficSamples)
// If recent average is consistently low and max is also low, likely idle
if avgRecent <= float64(m.maxProtocolTraffic) && maxRecent <= m.maxProtocolTraffic {
log.Infof("--- trending idle - avg: %.2f, max: %d, both <= %d bytes", avgRecent, maxRecent, m.maxProtocolTraffic)
return true
}
// If we've seen user-level traffic recently, consider active
if maxRecent >= m.minUserTraffic {
log.Infof("--- %s recently active - max recent traffic: %d >= %d bytes", maxRecent, m.minUserTraffic)
return false
}
}
// In ambiguous cases with insufficient data, be conservative
// Slight preference for idle since this traffic level suggests minimal activity
log.Infof("--- %s ambiguous traffic %d bytes - assuming idle (between %d and %d)", currentTraffic, m.maxProtocolTraffic, m.minUserTraffic)
return true
}
func (m *Manager) calculateAverage(samples []int64) float64 {
if len(samples) == 0 {
return 0
}
var sum int64
for _, sample := range samples {
sum += sample
}
return float64(sum) / float64(len(samples))
}
func (m *Manager) findMaximum(samples []int64) int64 {
if len(samples) == 0 {
return 0
}
maxVal := samples[0]
for _, sample := range samples[1:] {
if sample > maxVal {
maxVal = sample
}
}
return maxVal
}
func max(a, b int64) int64 {
if a > b {
return a
}
return b
}

View File

@@ -1,17 +0,0 @@
package inalt
import (
"testing"
"time"
)
func init() {
// Override the ticker factory for testing
newTicker = func(d time.Duration) Ticker {
return newFakeTicker(d)
}
}
func TestNewManager(t *testing.T) {
}

View File

@@ -1,34 +0,0 @@
package inalt
import (
"time"
)
// fakeTicker is a controllable ticker for use in tests
type fakeTicker struct {
ch chan time.Time
now time.Time
interval time.Duration
}
func newFakeTicker(d time.Duration) *fakeTicker {
return &fakeTicker{
ch: make(chan time.Time, 1),
now: time.Now(),
interval: d,
}
}
// C returns the channel to receive "ticks" — does not push values itself
func (f *fakeTicker) C() <-chan time.Time {
return f.ch
}
// Tick simulates advancing time and sending a tick
func (f *fakeTicker) Tick() {
f.now = f.now.Add(f.interval) // use your desired interval
f.ch <- f.now
}
// Stop is a no-op for fakeTicker
func (f *fakeTicker) Stop() {}

View File

@@ -11,7 +11,6 @@ import (
"github.com/netbirdio/netbird/client/internal/lazyconn"
"github.com/netbirdio/netbird/client/internal/lazyconn/activity"
"github.com/netbirdio/netbird/client/internal/lazyconn/inactivity"
"github.com/netbirdio/netbird/client/internal/lazyconn/inalt"
"github.com/netbirdio/netbird/client/internal/peer/dispatcher"
peerid "github.com/netbirdio/netbird/client/internal/peer/id"
"github.com/netbirdio/netbird/client/internal/peerstore"
@@ -54,7 +53,7 @@ type Manager struct {
managedPeersMu sync.Mutex
activityManager *activity.Manager
inactivityManager *inalt.Manager
inactivityManager *inactivity.Manager
// Route HA group management
peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to
@@ -75,7 +74,7 @@ func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.S
managedPeersByConnID: make(map[peerid.ConnID]*managedPeer),
excludes: make(map[string]lazyconn.PeerConfig),
activityManager: activity.NewManager(wgIface),
inactivityManager: inalt.NewManager(wgIface),
inactivityManager: inactivity.NewManager(wgIface),
peerToHAGroups: make(map[string][]route.HAUniqueID),
haGroupToPeers: make(map[route.HAUniqueID][]string),
}
@@ -145,13 +144,10 @@ func (m *Manager) Start(ctx context.Context) {
return
case peerConnID := <-m.activityManager.OnActivityChan:
m.onPeerActivity(ctx, peerConnID)
case _ = <-m.inactivityManager.InactivePeersChan:
/*
for _, peerID := range peerIDs {
m.onPeerInactivityTimedOut(peerID)
}
*/
case peerIDs := <-m.inactivityManager.InactivePeersChan:
for _, peerID := range peerIDs {
m.onPeerInactivityTimedOut(peerID)
}
}
}
}
@@ -192,7 +188,7 @@ func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerCo
peerCfg.Log.Infof("peer removed from lazy connection exclude list")
if err := m.addActivePeer(ctx, peerCfg); err != nil {
if err := m.addActivePeer(&peerCfg); err != nil {
log.Errorf("failed to add peer to lazy connection manager: %s", err)
continue
}
@@ -222,7 +218,7 @@ func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
return false, err
}
m.inactivityManager.AddPeer(peerCfg.PublicKey)
m.inactivityManager.AddPeer(&peerCfg)
m.managedPeers[peerCfg.PublicKey] = &peerCfg
m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{
@@ -244,7 +240,7 @@ func (m *Manager) AddActivePeers(ctx context.Context, peerCfg []lazyconn.PeerCon
continue
}
if err := m.addActivePeer(ctx, cfg); err != nil {
if err := m.addActivePeer(&cfg); err != nil {
cfg.Log.Errorf("failed to add peer to lazy connection manager: %v", err)
return err
}
@@ -306,7 +302,7 @@ func (m *Manager) activateSinglePeer(ctx context.Context, cfg *lazyconn.PeerConf
m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID)
cfg.Log.Infof("starting inactivity monitor for peer: %s", cfg.PublicKey)
m.inactivityManager.AddPeer(cfg.PublicKey)
m.inactivityManager.AddPeer(cfg)
return true
}
@@ -352,20 +348,20 @@ func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string
}
}
func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig) error {
func (m *Manager) addActivePeer(peerCfg *lazyconn.PeerConfig) error {
if _, ok := m.managedPeers[peerCfg.PublicKey]; ok {
peerCfg.Log.Warnf("peer already managed")
return nil
}
m.managedPeers[peerCfg.PublicKey] = &peerCfg
m.managedPeers[peerCfg.PublicKey] = peerCfg
m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{
peerCfg: &peerCfg,
peerCfg: peerCfg,
expectedWatcher: watcherInactivity,
}
peerCfg.Log.Infof("starting inactivity monitor on peer that has been removed from exclude list")
m.inactivityManager.AddPeer(peerCfg.PublicKey)
m.inactivityManager.AddPeer(peerCfg)
return nil
}
@@ -480,7 +476,7 @@ func (m *Manager) onPeerConnected(peerConnID peerid.ConnID) {
}
mp.peerCfg.Log.Infof("peer connected, pausing inactivity monitor while connection is not disconnected")
m.inactivityManager.AddPeer(mp.peerCfg.PublicKey)
m.inactivityManager.AddPeer(mp.peerCfg)
}
func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) {