Compare commits

...

12 Commits

Author SHA1 Message Date
Zoltán Papp
546538f570 Check 5 min intervals 2025-06-23 20:49:02 +02:00
Zoltán Papp
9307e7e0ea Add historical based algorithm 2025-06-21 19:38:44 +02:00
Zoltán Papp
9691e197df Merge branch 'main' into feature/poc-lazy-detection 2025-06-20 18:19:02 +02:00
Zoltán Papp
d380c925c2 Merge branch 'main' into feature/poc-lazy-detection 2025-06-20 17:54:27 +02:00
Zoltán Papp
a22a6f6d26 Remove unused ctx 2025-06-20 16:41:40 +02:00
Zoltán Papp
55e7ca96df Fix counter 2025-06-20 10:24:05 +02:00
Zoltán Papp
2d401a7dce Add signal message 2025-06-19 16:22:59 +02:00
Zoltán Papp
d7e68ff812 Fix test 2025-06-19 10:28:13 +02:00
Zoltán Papp
f3a5e34c3f Change the logic and add moc data 2025-06-18 20:31:06 +02:00
Zoltan Papp
b3c0b46a88 Merge remote-tracking branch 'origin/feature/poc-lazy-detection' into feature/poc-lazy-detection 2025-06-18 09:58:22 +02:00
Zoltan Papp
6c7a8a7741 Add new algorithm 2025-06-18 09:57:37 +02:00
Zoltan Papp
93d8d272bf Add new algorithm 2025-06-14 02:25:05 +02:00
18 changed files with 860 additions and 364 deletions

View File

@@ -198,6 +198,7 @@ func (s *ICEBind) createIPv4ReceiverFn(pc *ipv4.PacketConn, conn *net.UDPConn, r
if sizes[i] == 0 {
continue
}
log.Infof("--- received Datagram %s from %s, size: %d", msg.Addr, msg.Addr.String(), sizes[i])
addrPort := msg.Addr.(*net.UDPAddr).AddrPort()
ep := &wgConn.StdNetEndpoint{AddrPort: addrPort} // TODO: remove allocation
wgConn.GetSrcFromControl(msg.OOB[:msg.NN], ep)

View File

@@ -133,7 +133,7 @@ func (e *ConnMgr) SetExcludeList(ctx context.Context, peerIDs map[string]bool) {
excludedPeers = append(excludedPeers, lazyPeerCfg)
}
added := e.lazyConnMgr.ExcludePeer(e.lazyCtx, excludedPeers)
added := e.lazyConnMgr.ExcludePeer(excludedPeers)
for _, peerID := range added {
var peerConn *peer.Conn
var exists bool
@@ -175,7 +175,7 @@ func (e *ConnMgr) AddPeerConn(ctx context.Context, peerKey string, conn *peer.Co
PeerConnID: conn.ConnID(),
Log: conn.Log,
}
excluded, err := e.lazyConnMgr.AddPeer(e.lazyCtx, lazyPeerCfg)
excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg)
if err != nil {
conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err)
if err := conn.Open(ctx); err != nil {
@@ -201,7 +201,7 @@ func (e *ConnMgr) RemovePeerConn(peerKey string) {
if !ok {
return
}
defer conn.Close()
defer conn.Close(false)
if !e.isStartedWithLazyMgr() {
return
@@ -211,23 +211,25 @@ func (e *ConnMgr) RemovePeerConn(peerKey string) {
conn.Log.Infof("removed peer from lazy conn manager")
}
func (e *ConnMgr) OnSignalMsg(ctx context.Context, peerKey string) (*peer.Conn, bool) {
conn, ok := e.peerStore.PeerConn(peerKey)
if !ok {
return nil, false
}
func (e *ConnMgr) ActivatePeer(ctx context.Context, conn *peer.Conn) {
if !e.isStartedWithLazyMgr() {
return conn, true
return
}
if found := e.lazyConnMgr.ActivatePeer(e.lazyCtx, peerKey); found {
if found := e.lazyConnMgr.ActivatePeer(conn.GetKey()); found {
conn.Log.Infof("activated peer from inactive state")
if err := conn.Open(ctx); err != nil {
conn.Log.Errorf("failed to open connection: %v", err)
}
}
return conn, true
}
func (e *ConnMgr) DeactivatePeer(conn *peer.Conn) {
if !e.isStartedWithLazyMgr() {
return
}
e.lazyConnMgr.DeactivatePeer(conn.ConnID())
}
func (e *ConnMgr) Close() {
@@ -275,7 +277,7 @@ func (e *ConnMgr) addPeersToLazyConnManager() error {
lazyPeerCfgs = append(lazyPeerCfgs, lazyPeerCfg)
}
return e.lazyConnMgr.AddActivePeers(e.lazyCtx, lazyPeerCfgs)
return e.lazyConnMgr.AddActivePeers(lazyPeerCfgs)
}
func (e *ConnMgr) closeManager(ctx context.Context) {

View File

@@ -1255,7 +1255,7 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
}
if exists := e.connMgr.AddPeerConn(e.ctx, peerKey, conn); exists {
conn.Close()
conn.Close(false)
return fmt.Errorf("peer already exists: %s", peerKey)
}
@@ -1331,11 +1331,16 @@ func (e *Engine) receiveSignalEvents() {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
conn, ok := e.connMgr.OnSignalMsg(e.ctx, msg.Key)
conn, ok := e.peerStore.PeerConn(msg.Key)
if !ok {
return fmt.Errorf("wrongly addressed message %s", msg.Key)
}
msgType := msg.GetBody().GetType()
if msgType != sProto.Body_GO_IDLE {
e.connMgr.ActivatePeer(e.ctx, conn)
}
switch msg.GetBody().Type {
case sProto.Body_OFFER:
remoteCred, err := signal.UnMarshalCredential(msg)
@@ -1392,6 +1397,8 @@ func (e *Engine) receiveSignalEvents() {
go conn.OnRemoteCandidate(candidate, e.routeManager.GetClientRoutes())
case sProto.Body_MODE:
case sProto.Body_GO_IDLE:
e.connMgr.DeactivatePeer(conn)
}
return nil

View File

@@ -1,75 +0,0 @@
package inactivity
import (
"context"
"time"
peer "github.com/netbirdio/netbird/client/internal/peer/id"
)
const (
DefaultInactivityThreshold = 60 * time.Minute // idle after 1 hour inactivity
MinimumInactivityThreshold = 3 * time.Minute
)
type Monitor struct {
id peer.ConnID
timer *time.Timer
cancel context.CancelFunc
inactivityThreshold time.Duration
}
func NewInactivityMonitor(peerID peer.ConnID, threshold time.Duration) *Monitor {
i := &Monitor{
id: peerID,
timer: time.NewTimer(0),
inactivityThreshold: threshold,
}
i.timer.Stop()
return i
}
func (i *Monitor) Start(ctx context.Context, timeoutChan chan peer.ConnID) {
i.timer.Reset(i.inactivityThreshold)
defer i.timer.Stop()
ctx, i.cancel = context.WithCancel(ctx)
defer func() {
defer i.cancel()
select {
case <-i.timer.C:
default:
}
}()
select {
case <-i.timer.C:
select {
case timeoutChan <- i.id:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
func (i *Monitor) Stop() {
if i.cancel == nil {
return
}
i.cancel()
}
func (i *Monitor) PauseTimer() {
i.timer.Stop()
}
func (i *Monitor) ResetTimer() {
i.timer.Reset(i.inactivityThreshold)
}
func (i *Monitor) ResetMonitor(ctx context.Context, timeoutChan chan peer.ConnID) {
i.Stop()
go i.Start(ctx, timeoutChan)
}

View File

@@ -1,156 +0,0 @@
package inactivity
import (
"context"
"testing"
"time"
peerid "github.com/netbirdio/netbird/client/internal/peer/id"
)
type MocPeer struct {
}
func (m *MocPeer) ConnID() peerid.ConnID {
return peerid.ConnID(m)
}
func TestInactivityMonitor(t *testing.T) {
tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5)
defer testTimeoutCancel()
p := &MocPeer{}
im := NewInactivityMonitor(p.ConnID(), time.Second*2)
timeoutChan := make(chan peerid.ConnID)
exitChan := make(chan struct{})
go func() {
defer close(exitChan)
im.Start(tCtx, timeoutChan)
}()
select {
case <-timeoutChan:
case <-tCtx.Done():
t.Fatal("timeout")
}
select {
case <-exitChan:
case <-tCtx.Done():
t.Fatal("timeout")
}
}
func TestReuseInactivityMonitor(t *testing.T) {
p := &MocPeer{}
im := NewInactivityMonitor(p.ConnID(), time.Second*2)
timeoutChan := make(chan peerid.ConnID)
for i := 2; i > 0; i-- {
exitChan := make(chan struct{})
testTimeoutCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5)
go func() {
defer close(exitChan)
im.Start(testTimeoutCtx, timeoutChan)
}()
select {
case <-timeoutChan:
case <-testTimeoutCtx.Done():
t.Fatal("timeout")
}
select {
case <-exitChan:
case <-testTimeoutCtx.Done():
t.Fatal("timeout")
}
testTimeoutCancel()
}
}
func TestStopInactivityMonitor(t *testing.T) {
tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5)
defer testTimeoutCancel()
p := &MocPeer{}
im := NewInactivityMonitor(p.ConnID(), DefaultInactivityThreshold)
timeoutChan := make(chan peerid.ConnID)
exitChan := make(chan struct{})
go func() {
defer close(exitChan)
im.Start(tCtx, timeoutChan)
}()
go func() {
time.Sleep(3 * time.Second)
im.Stop()
}()
select {
case <-timeoutChan:
t.Fatal("unexpected timeout")
case <-exitChan:
case <-tCtx.Done():
t.Fatal("timeout")
}
}
func TestPauseInactivityMonitor(t *testing.T) {
tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*10)
defer testTimeoutCancel()
p := &MocPeer{}
trashHold := time.Second * 3
im := NewInactivityMonitor(p.ConnID(), trashHold)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
timeoutChan := make(chan peerid.ConnID)
exitChan := make(chan struct{})
go func() {
defer close(exitChan)
im.Start(ctx, timeoutChan)
}()
time.Sleep(1 * time.Second) // grant time to start the monitor
im.PauseTimer()
// check to do not receive timeout
thresholdCtx, thresholdCancel := context.WithTimeout(context.Background(), trashHold+time.Second)
defer thresholdCancel()
select {
case <-exitChan:
t.Fatal("unexpected exit")
case <-timeoutChan:
t.Fatal("unexpected timeout")
case <-thresholdCtx.Done():
// test ok
case <-tCtx.Done():
t.Fatal("test timed out")
}
// test reset timer
im.ResetTimer()
select {
case <-tCtx.Done():
t.Fatal("test timed out")
case <-exitChan:
t.Fatal("unexpected exit")
case <-timeoutChan:
// expected timeout
}
}

View File

@@ -0,0 +1,249 @@
package inactivity
import (
"container/list"
"context"
"fmt"
"math"
"os"
"strconv"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/internal/lazyconn"
)
// Responder: vmp2
// - Receive handshake initiation: 148 bytes + extra 32 bytes, every 02:00 - 03:00 minutes
// - Receive keep alive: 32 bytes, every 25 sec
// Initiator: mp1
// - Receive handshake response:
// - Receive keep alive: 32 bytes, every 25 sec
const (
keepAliveBytes = 32
keepAliveInterval = 25 * time.Second
handshakeInitBytes = 148
handshakeRespBytes = 92
handshakeMaxInterval = 3 * time.Minute
checkInterval = 1 * time.Minute
historySize = 5 * time.Minute
DefaultInactivityThreshold = 15 * time.Minute
MinimumInactivityThreshold = 5 * time.Minute
recorderEnv = "NB_LAZYCONN_RECORDER_ENABLED"
)
type WgInterface interface {
GetStats() (map[string]configurer.WGStats, error)
}
type peerHistory struct {
lastRxBytes int64 // last received bytes
bytesHistory *list.List // linked list of int64
historySize int
summarizedBytes int64
inactivityCounter int // counter to track inactivity
log *log.Entry
}
func newPeerHistory(log *log.Entry, historySize int) *peerHistory {
return &peerHistory{
bytesHistory: list.New(),
historySize: historySize,
log: log,
}
}
func (pi *peerHistory) appendRxBytes(rxBytes int64) {
// If at capacity, remove the oldest element (front)
if pi.bytesHistory.Len() == pi.historySize {
pi.summarizedBytes -= pi.bytesHistory.Front().Value.(int64)
pi.bytesHistory.Remove(pi.bytesHistory.Front())
}
// Add the new rxBytes at the back
pi.bytesHistory.PushBack(rxBytes)
pi.summarizedBytes += rxBytes
}
func (pi *peerHistory) historyString() string {
var history []string
for e := pi.bytesHistory.Front(); e != nil; e = e.Next() {
history = append(history, fmt.Sprintf("%d", e.Value.(int64)))
}
return fmt.Sprintf("%s", history)
}
func (pi *peerHistory) reset() {
for e := pi.bytesHistory.Front(); e != nil; e = e.Next() {
e.Value = int64(0)
}
pi.summarizedBytes = 0
}
type Manager struct {
InactivePeersChan chan []string
iface WgInterface
interestedPeers map[string]*peerHistory
maxBytesPerPeriod int64
historySize int // Size of the history buffer for each peer, used to track received bytes over time
recorder *Recorder
thresholdOfInactivity int // Number of consecutive checks with low activity to consider a peer inactive
}
func NewManager(iface WgInterface, configuredThreshold *time.Duration) *Manager {
inactivityThreshold, err := validateInactivityThreshold(configuredThreshold)
if err != nil {
inactivityThreshold = DefaultInactivityThreshold
log.Warnf("invalid inactivity threshold configured: %v, using default: %v", err, DefaultInactivityThreshold)
}
expectedMaxBytes := calculateExpectedMaxBytes()
log.Infof("receive less than %d bytes per %v, will be considered inactive", expectedMaxBytes, inactivityThreshold)
return &Manager{
InactivePeersChan: make(chan []string, 1),
iface: iface,
interestedPeers: make(map[string]*peerHistory),
historySize: int(historySize.Minutes()),
maxBytesPerPeriod: expectedMaxBytes,
thresholdOfInactivity: int(math.Ceil(inactivityThreshold.Minutes() / checkInterval.Minutes())),
}
}
func (m *Manager) AddPeer(peerCfg *lazyconn.PeerConfig) {
if _, exists := m.interestedPeers[peerCfg.PublicKey]; exists {
return
}
peerCfg.Log.Debugf("adding peer to inactivity manager")
m.interestedPeers[peerCfg.PublicKey] = newPeerHistory(peerCfg.Log, m.historySize)
}
func (m *Manager) RemovePeer(peer string) {
pi, ok := m.interestedPeers[peer]
if !ok {
return
}
pi.log.Debugf("remove peer from inactivity manager")
delete(m.interestedPeers, peer)
}
func (m *Manager) Start(ctx context.Context) {
enabled, err := strconv.ParseBool(os.Getenv(recorderEnv))
if err == nil && enabled {
m.recorder = NewRecorder()
defer m.recorder.Close()
}
ticker := newTicker(checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case tickTime := <-ticker.C():
idlePeers, err := m.checkStats(tickTime)
if err != nil {
log.Errorf("error checking stats: %v", err)
return
}
if len(idlePeers) == 0 {
continue
}
m.notifyInactivePeers(ctx, idlePeers)
}
}
}
func (m *Manager) notifyInactivePeers(ctx context.Context, inactivePeers []string) {
select {
case m.InactivePeersChan <- inactivePeers:
case <-ctx.Done():
return
default:
return
}
}
func (m *Manager) checkStats(now time.Time) ([]string, error) {
stats, err := m.iface.GetStats()
if err != nil {
return nil, err
}
var idlePeers []string
for peer, history := range m.interestedPeers {
stat, found := stats[peer]
if !found {
// when peer is in connecting state
history.log.Warnf("peer not found in wg stats")
}
deltaRx := stat.RxBytes - history.lastRxBytes
if deltaRx < 0 {
deltaRx = 0 // reset to zero if negative
history.reset()
}
m.recorder.ReceivedBytes(peer, now, deltaRx)
history.lastRxBytes = stat.RxBytes
history.appendRxBytes(deltaRx)
// not enough history to determine inactivity
if history.bytesHistory.Len() < m.historySize {
history.log.Debugf("not enough history to determine inactivity, current history size: %d, required: %d", history.bytesHistory.Len(), m.historySize)
continue
}
if history.summarizedBytes <= m.maxBytesPerPeriod {
history.inactivityCounter++
history.log.Debugf("peer is inactive, summarizedBytes: %d, maxBytesPerPeriod: %d, %v", history.summarizedBytes, m.maxBytesPerPeriod, history.historyString())
} else {
history.inactivityCounter = 0 // reset inactivity counter if activity is detected
history.log.Debugf("peer is active, summarizedBytes: %d, maxBytesPerPeriod: %d, %v", history.summarizedBytes, m.maxBytesPerPeriod, history.historyString())
}
if history.inactivityCounter >= m.thresholdOfInactivity {
history.log.Infof("peer is inactive for %d consecutive checks, marking as idle (limit %d) ", history.inactivityCounter, m.thresholdOfInactivity)
idlePeers = append(idlePeers, peer)
history.inactivityCounter = 0 // reset inactivity counter after marking as idle
}
}
return idlePeers, nil
}
func validateInactivityThreshold(configuredThreshold *time.Duration) (time.Duration, error) {
if configuredThreshold == nil {
return DefaultInactivityThreshold, nil
}
if *configuredThreshold < MinimumInactivityThreshold {
return 0, fmt.Errorf("configured inactivity threshold %v is too low, using %v", *configuredThreshold, MinimumInactivityThreshold)
}
return *configuredThreshold, nil
}
func calculateExpectedMaxBytes() int64 {
// Calculate number of keep-alive packets expected
keepAliveCount := int64(historySize.Seconds() / keepAliveInterval.Seconds())
keepAliveBytes := keepAliveCount * keepAliveBytes
// Calculate potential handshake packets (conservative estimate)
handshakeCount := int64(historySize.Minutes() / handshakeMaxInterval.Minutes())
handshakeBytes := handshakeCount * (handshakeInitBytes + keepAliveBytes) // handshake + extra bytes
return keepAliveBytes + handshakeBytes
}

View File

@@ -0,0 +1,53 @@
package inactivity
import (
"context"
"fmt"
"testing"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/lazyconn"
"github.com/netbirdio/netbird/util"
)
func init() {
_ = util.InitLog("trace", "console")
}
func TestNewManager(t *testing.T) {
for i, sc := range scenarios {
timer := NewFakeTimer()
newTicker = func(d time.Duration) Ticker {
return newFakeTicker(d, timer)
}
t.Run(fmt.Sprintf("Scenario %d", i), func(t *testing.T) {
mock := newMockWgInterface("peer1", sc.Data, timer)
manager := NewManager(mock, nil)
peerCfg := &lazyconn.PeerConfig{
PublicKey: "peer1",
Log: log.WithField("peer", "peer1"),
}
manager.AddPeer(peerCfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager.Start(ctx)
var inactiveResult bool
select {
case <-manager.InactivePeersChan:
inactiveResult = true
default:
inactiveResult = false
}
if inactiveResult != sc.ExpectedInactive {
t.Errorf("Expected inactive peers: %v, got: %v", sc.ExpectedInactive, inactiveResult)
}
})
}
}

View File

@@ -0,0 +1,102 @@
package inactivity
import (
"fmt"
"time"
"github.com/netbirdio/netbird/client/iface/configurer"
)
type rxHistory struct {
when time.Duration
RxBytes int64
}
// mockWgInterface mocks WgInterface to simulate peer stats.
type mockWgInterface struct {
peerID string
statsSequence []rxHistory
timer *FakeTimer
initialTime time.Time
reachedLast bool
}
func newMockWgInterface(peerID string, history []rxHistory, timer *FakeTimer) *mockWgInterface {
return &mockWgInterface{
peerID: peerID,
statsSequence: history,
timer: timer,
initialTime: timer.Now(),
}
}
func (m *mockWgInterface) GetStats() (map[string]configurer.WGStats, error) {
if m.reachedLast {
return nil, fmt.Errorf("no more data")
}
now := m.timer.Now()
var rx int64
for i, history := range m.statsSequence {
if now.Before(m.initialTime.Add(history.when)) {
break
}
if len(m.statsSequence)-1 == i {
m.reachedLast = true
}
rx += history.RxBytes
}
wgStats := make(map[string]configurer.WGStats)
wgStats[m.peerID] = configurer.WGStats{
RxBytes: rx,
}
return wgStats, nil
}
// fakeTicker is a controllable ticker for use in tests
type fakeTicker struct {
interval time.Duration
timer *FakeTimer
ch chan time.Time
now time.Time
}
func newFakeTicker(interval time.Duration, timer *FakeTimer) *fakeTicker {
return &fakeTicker{
interval: interval,
timer: timer,
ch: make(chan time.Time, 1),
now: timer.Now(),
}
}
func (f *fakeTicker) C() <-chan time.Time {
f.now = f.now.Add(f.interval)
f.timer.Set(f.now)
f.ch <- f.now
return f.ch
}
func (f *fakeTicker) Stop() {}
type FakeTimer struct {
now time.Time
}
func NewFakeTimer() *FakeTimer {
return &FakeTimer{
now: time.Date(2025, time.June, 1, 0, 0, 0, 0, time.UTC),
}
}
func (f *FakeTimer) Set(t time.Time) {
f.now = t
}
func (f *FakeTimer) Now() time.Time {
return f.now
}

View File

@@ -0,0 +1,58 @@
package inactivity
import (
"fmt"
"os"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
type Recorder struct {
mu sync.Mutex
file *os.File
filename string
}
func NewRecorder() *Recorder {
timestamp := time.Now().Format("20060102_150405")
filename := fmt.Sprintf("inactivity_log_%s.txt", timestamp)
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Errorf("error opening file: %v", err)
}
return &Recorder{
file: file,
filename: filename,
}
}
func (r *Recorder) ReceivedBytes(peer string, now time.Time, bytes int64) {
if r == nil {
return
}
r.mu.Lock()
defer r.mu.Unlock()
entry := fmt.Sprintf("%s; %s; %d\n", now.Format(time.RFC3339), peer, bytes)
_, err := r.file.WriteString(entry)
if err != nil {
log.Errorf("error writing to file: %v", err)
}
}
func (r *Recorder) Close() {
if r == nil {
return
}
r.mu.Lock()
defer r.mu.Unlock()
if err := r.file.Close(); err != nil {
log.Errorf("error closing file: %v", err)
}
}

View File

@@ -0,0 +1,209 @@
package inactivity
import "time"
type scenario struct {
ExpectedInactive bool
Data []rxHistory
}
var scenarios = []scenario{
{
ExpectedInactive: true,
Data: []rxHistory{
{when: 0 * time.Second, RxBytes: 32},
{when: 25 * time.Second, RxBytes: 32},
{when: 50 * time.Second, RxBytes: 32},
{when: 75 * time.Second, RxBytes: 32},
{when: 100 * time.Second, RxBytes: 32},
{when: 100 * time.Second, RxBytes: 92},
{when: 150 * time.Second, RxBytes: 32},
{when: 175 * time.Second, RxBytes: 32},
{when: 200 * time.Second, RxBytes: 32},
{when: 225 * time.Second, RxBytes: 32},
{when: 250 * time.Second, RxBytes: 32},
{when: 250 * time.Second, RxBytes: 92},
{when: 300 * time.Second, RxBytes: 32},
{when: 325 * time.Second, RxBytes: 32},
{when: 350 * time.Second, RxBytes: 32},
{when: 375 * time.Second, RxBytes: 32},
{when: 375 * time.Second, RxBytes: 92},
{when: 400 * time.Second, RxBytes: 32},
{when: 425 * time.Second, RxBytes: 32},
{when: 450 * time.Second, RxBytes: 32},
{when: 475 * time.Second, RxBytes: 32},
{when: 500 * time.Second, RxBytes: 32},
{when: 500 * time.Second, RxBytes: 92},
{when: 525 * time.Second, RxBytes: 32},
{when: 550 * time.Second, RxBytes: 32},
{when: 575 * time.Second, RxBytes: 32},
{when: 600 * time.Second, RxBytes: 32},
{when: 625 * time.Second, RxBytes: 32},
{when: 625 * time.Second, RxBytes: 92},
{when: 650 * time.Second, RxBytes: 32},
{when: 675 * time.Second, RxBytes: 32},
{when: 700 * time.Second, RxBytes: 32},
{when: 725 * time.Second, RxBytes: 32},
{when: 750 * time.Second, RxBytes: 32},
{when: 750 * time.Second, RxBytes: 92},
{when: 775 * time.Second, RxBytes: 32},
},
},
{
ExpectedInactive: true,
Data: []rxHistory{
//96
{when: 0 * time.Second, RxBytes: 32},
{when: 25 * time.Second, RxBytes: 32},
{when: 50 * time.Second, RxBytes: 32},
//212
{when: 75 * time.Second, RxBytes: 32},
{when: 100 * time.Second, RxBytes: 32},
{when: 100 * time.Second, RxBytes: 148},
//96
{when: 125 * time.Second, RxBytes: 32},
{when: 150 * time.Second, RxBytes: 32},
{when: 175 * time.Second, RxBytes: 32},
//212
{when: 200 * time.Second, RxBytes: 32},
{when: 225 * time.Second, RxBytes: 32},
{when: 225 * time.Second, RxBytes: 148},
//96
{when: 250 * time.Second, RxBytes: 32},
{when: 275 * time.Second, RxBytes: 32},
{when: 300 * time.Second, RxBytes: 32},
{when: 325 * time.Second, RxBytes: 32},
{when: 350 * time.Second, RxBytes: 32},
{when: 350 * time.Second, RxBytes: 148},
{when: 375 * time.Second, RxBytes: 32},
{when: 400 * time.Second, RxBytes: 32},
{when: 425 * time.Second, RxBytes: 32},
{when: 450 * time.Second, RxBytes: 32},
{when: 475 * time.Second, RxBytes: 32},
{when: 475 * time.Second, RxBytes: 148},
{when: 500 * time.Second, RxBytes: 32},
{when: 525 * time.Second, RxBytes: 32},
{when: 550 * time.Second, RxBytes: 32},
{when: 575 * time.Second, RxBytes: 32},
{when: 600 * time.Second, RxBytes: 32},
{when: 600 * time.Second, RxBytes: 148},
{when: 625 * time.Second, RxBytes: 32},
{when: 650 * time.Second, RxBytes: 32},
{when: 675 * time.Second, RxBytes: 32},
{when: 700 * time.Second, RxBytes: 32},
{when: 725 * time.Second, RxBytes: 32},
{when: 725 * time.Second, RxBytes: 148},
{when: 750 * time.Second, RxBytes: 32},
},
},
{
// Rosenpass
ExpectedInactive: true,
Data: []rxHistory{
{when: 0 * time.Second, RxBytes: 1200},
{when: 0 * time.Second, RxBytes: 1200},
{when: 0 * time.Second, RxBytes: 128},
{when: 0 * time.Second, RxBytes: 1200},
{when: 0 * time.Second, RxBytes: 128},
{when: 0 * time.Second, RxBytes: 2},
{when: 35 * time.Second, RxBytes: 32},
{when: 60 * time.Second, RxBytes: 32},
{when: 85 * time.Second, RxBytes: 32},
{when: 110 * time.Second, RxBytes: 32},
{when: 120 * time.Second, RxBytes: 1152},
{when: 120 * time.Second, RxBytes: 92},
{when: 120 * time.Second, RxBytes: 240},
{when: 130 * time.Second, RxBytes: 1200},
{when: 130 * time.Second, RxBytes: 32},
{when: 130 * time.Second, RxBytes: 1200},
{when: 130 * time.Second, RxBytes: 128},
{when: 165 * time.Second, RxBytes: 32},
{when: 190 * time.Second, RxBytes: 32},
{when: 215 * time.Second, RxBytes: 32},
{when: 240 * time.Second, RxBytes: 92},
{when: 240 * time.Second, RxBytes: 1200},
{when: 240 * time.Second, RxBytes: 128},
{when: 260 * time.Second, RxBytes: 1200},
{when: 260 * time.Second, RxBytes: 1200},
{when: 260 * time.Second, RxBytes: 128},
{when: 320 * time.Second, RxBytes: 32},
{when: 345 * time.Second, RxBytes: 32},
{when: 370 * time.Second, RxBytes: 92},
{when: 370 * time.Second, RxBytes: 1200},
{when: 370 * time.Second, RxBytes: 128},
{when: 390 * time.Second, RxBytes: 1200},
{when: 390 * time.Second, RxBytes: 128},
{when: 450 * time.Second, RxBytes: 32},
{when: 475 * time.Second, RxBytes: 32},
{when: 500 * time.Second, RxBytes: 92},
{when: 500 * time.Second, RxBytes: 1200},
{when: 500 * time.Second, RxBytes: 128},
{when: 520 * time.Second, RxBytes: 1200},
{when: 520 * time.Second, RxBytes: 128},
},
},
{
ExpectedInactive: true,
Data: []rxHistory{
{when: 0 * time.Second, RxBytes: 1152},
{when: 0 * time.Second, RxBytes: 1152},
{when: 0 * time.Second, RxBytes: 240},
{when: 0 * time.Second, RxBytes: 1152},
{when: 1 * time.Second, RxBytes: 240},
{when: 1 * time.Second, RxBytes: 2},
{when: 11 * time.Second, RxBytes: 32},
{when: 121 * time.Second, RxBytes: 1200},
{when: 121 * time.Second, RxBytes: 148},
{when: 121 * time.Second, RxBytes: 32},
{when: 121 * time.Second, RxBytes: 128},
{when: 131 * time.Second, RxBytes: 1152},
{when: 131 * time.Second, RxBytes: 1152},
{when: 131 * time.Second, RxBytes: 240},
{when: 141 * time.Second, RxBytes: 32},
{when: 191 * time.Second, RxBytes: 32},
{when: 241 * time.Second, RxBytes: 1152},
{when: 241 * time.Second, RxBytes: 148},
{when: 241 * time.Second, RxBytes: 32},
{when: 241 * time.Second, RxBytes: 240},
{when: 251 * time.Second, RxBytes: 32},
{when: 261 * time.Second, RxBytes: 1152},
{when: 261 * time.Second, RxBytes: 1152},
{when: 261 * time.Second, RxBytes: 240},
{when: 271 * time.Second, RxBytes: 32},
{when: 296 * time.Second, RxBytes: 32},
{when: 371 * time.Second, RxBytes: 1152},
{when: 371 * time.Second, RxBytes: 148},
{when: 371 * time.Second, RxBytes: 32},
{when: 371 * time.Second, RxBytes: 240},
{when: 381 * time.Second, RxBytes: 32},
{when: 391 * time.Second, RxBytes: 1152},
{when: 391 * time.Second, RxBytes: 240},
{when: 401 * time.Second, RxBytes: 32},
{when: 426 * time.Second, RxBytes: 32},
{when: 501 * time.Second, RxBytes: 1152},
{when: 501 * time.Second, RxBytes: 148},
{when: 501 * time.Second, RxBytes: 32},
{when: 501 * time.Second, RxBytes: 240},
{when: 511 * time.Second, RxBytes: 32},
{when: 521 * time.Second, RxBytes: 1152},
{when: 521 * time.Second, RxBytes: 240},
{when: 531 * time.Second, RxBytes: 32},
{when: 631 * time.Second, RxBytes: 1152},
{when: 631 * time.Second, RxBytes: 148},
{when: 631 * time.Second, RxBytes: 32},
{when: 631 * time.Second, RxBytes: 240},
},
},
}

View File

@@ -0,0 +1,24 @@
package inactivity
import "time"
var newTicker = func(d time.Duration) Ticker {
return &realTicker{t: time.NewTicker(d)}
}
type Ticker interface {
C() <-chan time.Time
Stop()
}
type realTicker struct {
t *time.Ticker
}
func (r *realTicker) C() <-chan time.Time {
return r.t.C
}
func (r *realTicker) Stop() {
r.t.Stop()
}

View File

@@ -52,8 +52,8 @@ type Manager struct {
excludes map[string]lazyconn.PeerConfig
managedPeersMu sync.Mutex
activityManager *activity.Manager
inactivityMonitors map[peerid.ConnID]*inactivity.Monitor
activityManager *activity.Manager
inactivityManager *inactivity.Manager
// Route HA group management
peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to
@@ -67,6 +67,7 @@ type Manager struct {
// engineCtx is the context for creating peer Connection
func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.Store, wgIface lazyconn.WGIface, connStateDispatcher *dispatcher.ConnectionDispatcher) *Manager {
log.Infof("setup lazy connection service")
m := &Manager{
engineCtx: engineCtx,
peerStore: peerStore,
@@ -76,18 +77,9 @@ func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.S
managedPeersByConnID: make(map[peerid.ConnID]*managedPeer),
excludes: make(map[string]lazyconn.PeerConfig),
activityManager: activity.NewManager(wgIface),
inactivityMonitors: make(map[peerid.ConnID]*inactivity.Monitor),
inactivityManager: inactivity.NewManager(wgIface, config.InactivityThreshold),
peerToHAGroups: make(map[string][]route.HAUniqueID),
haGroupToPeers: make(map[route.HAUniqueID][]string),
onInactive: make(chan peerid.ConnID),
}
if config.InactivityThreshold != nil {
if *config.InactivityThreshold >= inactivity.MinimumInactivityThreshold {
m.inactivityThreshold = *config.InactivityThreshold
} else {
log.Warnf("inactivity threshold is too low, using %v", m.inactivityThreshold)
}
}
m.connStateListener = &dispatcher.ConnectionListener{
@@ -139,14 +131,18 @@ func (m *Manager) UpdateRouteHAMap(haMap route.HAMap) {
func (m *Manager) Start(ctx context.Context) {
defer m.close()
go m.inactivityManager.Start(ctx)
for {
select {
case <-ctx.Done():
return
case peerConnID := <-m.activityManager.OnActivityChan:
m.onPeerActivity(ctx, peerConnID)
case peerConnID := <-m.onInactive:
m.onPeerInactivityTimedOut(ctx, peerConnID)
m.onPeerActivity(peerConnID)
case peerIDs := <-m.inactivityManager.InactivePeersChan:
for _, peerID := range peerIDs {
m.onPeerInactivityTimedOut(peerID)
}
}
}
}
@@ -156,7 +152,7 @@ func (m *Manager) Start(ctx context.Context) {
// Adds them back to the managed list and start the inactivity listener if they are removed from the exclude list. In
// this case, we suppose that the connection status is connected or connecting.
// If the peer is not exists yet in the managed list then the responsibility is the upper layer to call the AddPeer function
func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerConfig) []string {
func (m *Manager) ExcludePeer(peerConfigs []lazyconn.PeerConfig) []string {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
@@ -187,7 +183,7 @@ func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerCo
peerCfg.Log.Infof("peer removed from lazy connection exclude list")
if err := m.addActivePeer(ctx, peerCfg); err != nil {
if err := m.addActivePeer(&peerCfg); err != nil {
log.Errorf("failed to add peer to lazy connection manager: %s", err)
continue
}
@@ -197,7 +193,7 @@ func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerCo
return added
}
func (m *Manager) AddPeer(ctx context.Context, peerCfg lazyconn.PeerConfig) (bool, error) {
func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
@@ -217,9 +213,6 @@ func (m *Manager) AddPeer(ctx context.Context, peerCfg lazyconn.PeerConfig) (boo
return false, err
}
im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID, m.inactivityThreshold)
m.inactivityMonitors[peerCfg.PeerConnID] = im
m.managedPeers[peerCfg.PublicKey] = &peerCfg
m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{
peerCfg: &peerCfg,
@@ -229,7 +222,7 @@ func (m *Manager) AddPeer(ctx context.Context, peerCfg lazyconn.PeerConfig) (boo
// Check if this peer should be activated because its HA group peers are active
if group, ok := m.shouldActivateNewPeer(peerCfg.PublicKey); ok {
peerCfg.Log.Debugf("peer belongs to active HA group %s, will activate immediately", group)
m.activateNewPeerInActiveGroup(ctx, peerCfg)
m.activateNewPeerInActiveGroup(peerCfg)
}
return false, nil
@@ -237,7 +230,7 @@ func (m *Manager) AddPeer(ctx context.Context, peerCfg lazyconn.PeerConfig) (boo
// AddActivePeers adds a list of peers to the lazy connection manager
// suppose these peers was in connected or in connecting states
func (m *Manager) AddActivePeers(ctx context.Context, peerCfg []lazyconn.PeerConfig) error {
func (m *Manager) AddActivePeers(peerCfg []lazyconn.PeerConfig) error {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
@@ -247,7 +240,7 @@ func (m *Manager) AddActivePeers(ctx context.Context, peerCfg []lazyconn.PeerCon
continue
}
if err := m.addActivePeer(ctx, cfg); err != nil {
if err := m.addActivePeer(&cfg); err != nil {
cfg.Log.Errorf("failed to add peer to lazy connection manager: %v", err)
return err
}
@@ -264,7 +257,7 @@ func (m *Manager) RemovePeer(peerID string) {
// ActivatePeer activates a peer connection when a signal message is received
// Also activates all peers in the same HA groups as this peer
func (m *Manager) ActivatePeer(ctx context.Context, peerID string) (found bool) {
func (m *Manager) ActivatePeer(peerID string) (found bool) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
cfg, mp := m.getPeerForActivation(peerID)
@@ -272,15 +265,42 @@ func (m *Manager) ActivatePeer(ctx context.Context, peerID string) (found bool)
return false
}
if !m.activateSinglePeer(ctx, cfg, mp) {
if !m.activateSinglePeer(cfg, mp) {
return false
}
m.activateHAGroupPeers(ctx, peerID)
m.activateHAGroupPeers(peerID)
return true
}
func (m *Manager) DeactivatePeer(peerID peerid.ConnID) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
mp, ok := m.managedPeersByConnID[peerID]
if !ok {
return
}
if mp.expectedWatcher != watcherInactivity {
return
}
m.peerStore.PeerConnClose(mp.peerCfg.PublicKey)
mp.peerCfg.Log.Infof("start activity monitor")
mp.expectedWatcher = watcherActivity
m.inactivityManager.RemovePeer(mp.peerCfg.PublicKey)
if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil {
mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err)
return
}
}
// getPeerForActivation checks if a peer can be activated and returns the necessary structs
// Returns nil values if the peer should be skipped
func (m *Manager) getPeerForActivation(peerID string) (*lazyconn.PeerConfig, *managedPeer) {
@@ -303,25 +323,19 @@ func (m *Manager) getPeerForActivation(peerID string) (*lazyconn.PeerConfig, *ma
}
// activateSinglePeer activates a single peer (internal method)
func (m *Manager) activateSinglePeer(ctx context.Context, cfg *lazyconn.PeerConfig, mp *managedPeer) bool {
func (m *Manager) activateSinglePeer(cfg *lazyconn.PeerConfig, mp *managedPeer) bool {
mp.expectedWatcher = watcherInactivity
m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID)
im, ok := m.inactivityMonitors[cfg.PeerConnID]
if !ok {
cfg.Log.Errorf("inactivity monitor not found for peer")
return false
}
cfg.Log.Infof("starting inactivity monitor")
go im.Start(ctx, m.onInactive)
cfg.Log.Infof("starting inactivity monitor for peer: %s", cfg.PublicKey)
m.inactivityManager.AddPeer(cfg)
return true
}
// activateHAGroupPeers activates all peers in HA groups that the given peer belongs to
func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string) {
func (m *Manager) activateHAGroupPeers(triggerPeerID string) {
var peersToActivate []string
m.routesMu.RLock()
@@ -350,7 +364,7 @@ func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string
continue
}
if m.activateSinglePeer(ctx, cfg, mp) {
if m.activateSinglePeer(cfg, mp) {
activatedCount++
cfg.Log.Infof("activated peer as part of HA group (triggered by %s)", triggerPeerID)
m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey)
@@ -394,13 +408,13 @@ func (m *Manager) shouldActivateNewPeer(peerID string) (route.HAUniqueID, bool)
}
// activateNewPeerInActiveGroup activates a newly added peer that should be active due to HA group
func (m *Manager) activateNewPeerInActiveGroup(ctx context.Context, peerCfg lazyconn.PeerConfig) {
func (m *Manager) activateNewPeerInActiveGroup(peerCfg lazyconn.PeerConfig) {
mp, ok := m.managedPeersByConnID[peerCfg.PeerConnID]
if !ok {
return
}
if !m.activateSinglePeer(ctx, &peerCfg, mp) {
if !m.activateSinglePeer(&peerCfg, mp) {
return
}
@@ -408,23 +422,19 @@ func (m *Manager) activateNewPeerInActiveGroup(ctx context.Context, peerCfg lazy
m.peerStore.PeerConnOpen(m.engineCtx, peerCfg.PublicKey)
}
func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig) error {
func (m *Manager) addActivePeer(peerCfg *lazyconn.PeerConfig) error {
if _, ok := m.managedPeers[peerCfg.PublicKey]; ok {
peerCfg.Log.Warnf("peer already managed")
return nil
}
im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID, m.inactivityThreshold)
m.inactivityMonitors[peerCfg.PeerConnID] = im
m.managedPeers[peerCfg.PublicKey] = &peerCfg
m.managedPeers[peerCfg.PublicKey] = peerCfg
m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{
peerCfg: &peerCfg,
peerCfg: peerCfg,
expectedWatcher: watcherInactivity,
}
peerCfg.Log.Infof("starting inactivity monitor on peer that has been removed from exclude list")
go im.Start(ctx, m.onInactive)
m.inactivityManager.AddPeer(peerCfg)
return nil
}
@@ -436,12 +446,7 @@ func (m *Manager) removePeer(peerID string) {
cfg.Log.Infof("removing lazy peer")
if im, ok := m.inactivityMonitors[cfg.PeerConnID]; ok {
im.Stop()
delete(m.inactivityMonitors, cfg.PeerConnID)
cfg.Log.Debugf("inactivity monitor stopped")
}
m.inactivityManager.RemovePeer(cfg.PublicKey)
m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID)
delete(m.managedPeers, peerID)
delete(m.managedPeersByConnID, cfg.PeerConnID)
@@ -453,10 +458,7 @@ func (m *Manager) close() {
m.connStateDispatcher.RemoveListener(m.connStateListener)
m.activityManager.Close()
for _, iw := range m.inactivityMonitors {
iw.Stop()
}
m.inactivityMonitors = make(map[peerid.ConnID]*inactivity.Monitor)
m.managedPeers = make(map[string]*lazyconn.PeerConfig)
m.managedPeersByConnID = make(map[peerid.ConnID]*managedPeer)
@@ -511,7 +513,7 @@ func (m *Manager) shouldDeferIdleForHA(peerID string) bool {
return false
}
func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) {
func (m *Manager) onPeerActivity(peerConnID peerid.ConnID) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
@@ -528,22 +530,28 @@ func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID)
mp.peerCfg.Log.Infof("detected peer activity")
if !m.activateSinglePeer(ctx, mp.peerCfg, mp) {
if !m.activateSinglePeer(mp.peerCfg, mp) {
return
}
m.activateHAGroupPeers(ctx, mp.peerCfg.PublicKey)
m.activateHAGroupPeers(mp.peerCfg.PublicKey)
m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey)
}
func (m *Manager) onPeerInactivityTimedOut(ctx context.Context, peerConnID peerid.ConnID) {
func (m *Manager) onPeerInactivityTimedOut(peerID string) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
mp, ok := m.managedPeersByConnID[peerConnID]
peerCfg, ok := m.managedPeers[peerID]
if !ok {
log.Errorf("peer not found by id: %v", peerConnID)
log.Errorf("peer not found by peerId: %v", peerID)
return
}
mp, ok := m.managedPeersByConnID[peerCfg.PeerConnID]
if !ok {
log.Errorf("peer not found by conn id: %v", peerCfg.PeerConnID)
return
}
@@ -553,13 +561,7 @@ func (m *Manager) onPeerInactivityTimedOut(ctx context.Context, peerConnID peeri
}
if m.shouldDeferIdleForHA(mp.peerCfg.PublicKey) {
iw, ok := m.inactivityMonitors[peerConnID]
if ok {
mp.peerCfg.Log.Debugf("resetting inactivity timer due to HA group requirements")
iw.ResetMonitor(ctx, m.onInactive)
} else {
mp.peerCfg.Log.Errorf("inactivity monitor not found for HA defer reset")
}
// todo: review to how reset the inactivity detection
return
}
@@ -572,8 +574,7 @@ func (m *Manager) onPeerInactivityTimedOut(ctx context.Context, peerConnID peeri
mp.expectedWatcher = watcherActivity
// just in case free up
m.inactivityMonitors[peerConnID].PauseTimer()
m.inactivityManager.RemovePeer(mp.peerCfg.PublicKey)
if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil {
mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err)
@@ -594,14 +595,8 @@ func (m *Manager) onPeerConnected(peerConnID peerid.ConnID) {
return
}
iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID]
if !ok {
mp.peerCfg.Log.Warnf("inactivity monitor not found for peer")
return
}
mp.peerCfg.Log.Infof("peer connected, pausing inactivity monitor while connection is not disconnected")
iw.PauseTimer()
mp.peerCfg.Log.Infof("peer connected, starting inactivity monitor")
m.inactivityManager.AddPeer(mp.peerCfg)
}
func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) {
@@ -617,11 +612,6 @@ func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) {
return
}
iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID]
if !ok {
return
}
mp.peerCfg.Log.Infof("reset inactivity monitor timer")
iw.ResetTimer()
// todo reset inactivity monitor
mp.peerCfg.Log.Warnf("--- peer disconnected, stopping inactivity monitor?")
}

View File

@@ -1,6 +1,7 @@
package lazyconn
import (
"github.com/netbirdio/netbird/client/iface/configurer"
"net"
"net/netip"
"time"
@@ -11,4 +12,5 @@ import (
type WGIface interface {
RemovePeer(peerKey string) error
UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error
GetStats() (map[string]configurer.WGStats, error)
}

View File

@@ -226,7 +226,7 @@ func (conn *Conn) Open(engineCtx context.Context) error {
}
// Close closes this peer Conn issuing a close event to the Conn closeCh
func (conn *Conn) Close() {
func (conn *Conn) Close(graceful bool) {
conn.mu.Lock()
defer conn.wgWatcherWg.Wait()
defer conn.mu.Unlock()
@@ -236,6 +236,10 @@ func (conn *Conn) Close() {
return
}
if graceful {
conn.signaler.SignalIdle(conn.config.Key)
}
conn.Log.Infof("close peer connection")
conn.ctxCancel()

View File

@@ -68,3 +68,13 @@ func (s *Signaler) signalOfferAnswer(offerAnswer OfferAnswer, remoteKey string,
return nil
}
func (s *Signaler) SignalIdle(remoteKey string) error {
return s.signal.Send(&sProto.Message{
Key: s.wgPrivateKey.PublicKey().String(),
RemoteKey: remoteKey,
Body: &sProto.Body{
Type: sProto.Body_GO_IDLE,
},
})
}

View File

@@ -95,6 +95,17 @@ func (s *Store) PeerConnOpen(ctx context.Context, pubKey string) {
}
func (s *Store) PeerConnIdle(pubKey string) {
s.peerConnsMu.RLock()
defer s.peerConnsMu.RUnlock()
p, ok := s.peerConns[pubKey]
if !ok {
return
}
p.Close(true)
}
func (s *Store) PeerConnClose(pubKey string) {
s.peerConnsMu.RLock()
defer s.peerConnsMu.RUnlock()
@@ -103,7 +114,7 @@ func (s *Store) PeerConnClose(pubKey string) {
if !ok {
return
}
p.Close()
p.Close(false)
}
func (s *Store) PeersPubKey() []string {

View File

@@ -29,6 +29,7 @@ const (
Body_ANSWER Body_Type = 1
Body_CANDIDATE Body_Type = 2
Body_MODE Body_Type = 4
Body_GO_IDLE Body_Type = 5
)
// Enum value maps for Body_Type.
@@ -38,12 +39,14 @@ var (
1: "ANSWER",
2: "CANDIDATE",
4: "MODE",
5: "GO_IDLE",
}
Body_Type_value = map[string]int32{
"OFFER": 0,
"ANSWER": 1,
"CANDIDATE": 2,
"MODE": 4,
"GO_IDLE": 5,
}
)
@@ -225,7 +228,7 @@ type Body struct {
FeaturesSupported []uint32 `protobuf:"varint,6,rep,packed,name=featuresSupported,proto3" json:"featuresSupported,omitempty"`
// RosenpassConfig is a Rosenpass config of the remote peer our peer tries to connect to
RosenpassConfig *RosenpassConfig `protobuf:"bytes,7,opt,name=rosenpassConfig,proto3" json:"rosenpassConfig,omitempty"`
// relayServerAddress is an IP:port of the relay server
// relayServerAddress is url of the relay server
RelayServerAddress string `protobuf:"bytes,8,opt,name=relayServerAddress,proto3" json:"relayServerAddress,omitempty"`
}
@@ -440,7 +443,7 @@ var file_signalexchange_proto_rawDesc = []byte{
0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62,
0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e,
0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52,
0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xa6, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d,
0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xb3, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d,
0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73,
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f,
0x64, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a,
@@ -463,33 +466,34 @@ var file_signalexchange_proto_rawDesc = []byte{
0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53,
0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01,
0x28, 0x09, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x36, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09,
0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4e, 0x53,
0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44, 0x49, 0x44, 0x41,
0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10, 0x04, 0x22, 0x2e,
0x0a, 0x04, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74,
0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74,
0x88, 0x01, 0x01, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d,
0x0a, 0x0f, 0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69,
0x67, 0x12, 0x28, 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75,
0x62, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65,
0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72,
0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64,
0x64, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70,
0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01,
0x0a, 0x0e, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65,
0x12, 0x4c, 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61,
0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70,
0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67,
0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72,
0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59,
0x0a, 0x0d, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12,
0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65,
0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e,
0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10, 0x04, 0x12, 0x0b,
0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x22, 0x2e, 0x0a, 0x04, 0x4d,
0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20,
0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01, 0x01,
0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f, 0x52,
0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28,
0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65,
0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61,
0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73, 0x65,
0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73,
0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e, 0x53,
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c, 0x0a,
0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78,
0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64,
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c,
0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74,
0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d, 0x43,
0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e, 0x73,
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e,
0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20,
0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e,
0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@@ -47,6 +47,7 @@ message Body {
ANSWER = 1;
CANDIDATE = 2;
MODE = 4;
GO_IDLE = 5;
}
Type type = 1;
string payload = 2;
@@ -74,4 +75,4 @@ message RosenpassConfig {
bytes rosenpassPubKey = 1;
// rosenpassServerAddr is an IP:port of the rosenpass service
string rosenpassServerAddr = 2;
}
}