Add half done idle manamgenet

This commit is contained in:
Zoltán Papp
2025-03-12 10:30:39 +01:00
parent bfe71a3f5a
commit 332b2c5a88
6 changed files with 264 additions and 134 deletions

View File

@@ -24,19 +24,19 @@ const (
// - Maintaining a list of excluded peers that should always have permanent connections
// - Handling connection establishment based on peer signaling
type ConnMgr struct {
lazyConnMgr *lazyConnManager.Manager
peerStore *peerstore.Store
lazyConnMgr *lazyConnManager.Manager
excludes map[string]struct{}
connStateListener *peer.ConnectionListener
wg sync.WaitGroup
ctxCancel context.CancelFunc
}
func NewConnMgr(peerStore *peerstore.Store, iface lazyconn.WGIface) *ConnMgr {
func NewConnMgr(peerStore *peerstore.Store, iface lazyconn.WGIface, dispatcher *peer.ConnectionDispatcher) *ConnMgr {
var lazyConnMgr *lazyConnManager.Manager
if os.Getenv(envDisableLazyConn) != "true" {
lazyConnMgr = lazyConnManager.NewManager(iface)
lazyConnMgr = lazyConnManager.NewManager(iface, dispatcher)
}
e := &ConnMgr{
@@ -56,14 +56,11 @@ func (e *ConnMgr) Start(parentCtx context.Context) {
e.ctxCancel = cancel
e.wg.Add(1)
go func() {
defer e.wg.Done()
e.receiveLazyConnEvents(ctx)
}()
go e.receiveLazyEvents(ctx)
}
func (e *ConnMgr) AddExcludeFromLazyConnection(peerID string) {
e.excludes[peerID] = struct{}{}
e.lazyConnMgr.ExcludePeer(peerID)
}
func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) {
@@ -76,21 +73,24 @@ func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) {
return
}
_, exists = e.excludes[peerKey]
if exists {
conn.Open()
return
}
lazyPeerCfg := lazyconn.PeerConfig{
PublicKey: peerKey,
AllowedIPs: conn.WgConfig().AllowedIps,
PeerConnID: conn.ConnID(),
}
if err := e.lazyConnMgr.AddPeer(lazyPeerCfg); err != nil {
log.Errorf("failed to add peer to lazyconn manager: %v", err)
excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg)
if err != nil {
conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err)
conn.Open()
return
}
if excluded {
conn.Log.Infof("peer is on lazy conn manager exclude list, opening connection")
conn.Open()
return
}
conn.Log.Infof("peer added to lazy conn manager")
return
}
@@ -138,14 +138,17 @@ func (e *ConnMgr) Close() {
e.lazyConnMgr = nil
}
func (e *ConnMgr) receiveLazyConnEvents(ctx context.Context) {
func (e *ConnMgr) receiveLazyEvents(ctx context.Context) {
defer e.wg.Done()
for {
peerID, err := e.lazyConnMgr.NextEvent(ctx)
if err != nil {
log.Infof("lazy connection manager closed: %v", err)
select {
case <-ctx.Done():
return
case peerID := <-e.lazyConnMgr.OnDemand:
e.peerStore.PeerConnOpen(peerID)
case peerID := <-e.lazyConnMgr.Idle:
e.peerStore.PeerConnClose(peerID)
}
e.peerStore.PeerConnOpen(peerID)
}
}

View File

@@ -0,0 +1,54 @@
package manager
import (
"context"
"time"
)
const (
idleTimeout = 60 * time.Minute // went to idle after 1 hour inactivity
)
type IdleWatch struct {
onIdle chan struct{}
timer *time.Timer
}
func NewIdleWatch() *IdleWatch {
i := &IdleWatch{
onIdle: make(chan struct{}, 1),
timer: time.NewTimer(0),
}
i.timer.Stop()
return i
}
// call on open connection
func (i *IdleWatch) Start(ctx context.Context) {
i.timer.Reset(idleTimeout)
defer i.timer.Stop()
select {
case <-i.timer.C:
select {
case i.Idle <- struct{}{}:
default:
}
case <-ctx.Done():
return
}
}
func (i *IdleWatch) Stop() {
// todo implement
}
// call when connected
func (i *IdleWatch) HangUp() {
i.timer.Stop()
}
// call when switch to None priority
func (i *IdleWatch) Reset() {
i.timer.Reset(idleTimeout)
}

View File

@@ -9,43 +9,69 @@ import (
"github.com/netbirdio/netbird/client/internal/lazyconn"
"github.com/netbirdio/netbird/client/internal/lazyconn/listener"
"github.com/netbirdio/netbird/client/internal/peer"
)
// Manager manages lazy connections
// This is not a thread safe implementation, do not call exported functions concurrently
type Manager struct {
listenerMgr *listener.Manager
managedPeers map[string]lazyconn.PeerConfig
OnDemand chan string
Idle chan string
listenerMgr *listener.Manager
managedPeers map[string]lazyconn.PeerConfig
idleWatch map[string]*IdleWatch
excludes map[string]struct{}
managedPeersMu sync.Mutex
closeChan chan struct{}
}
func NewManager(wgIface lazyconn.WGIface) *Manager {
func NewManager(wgIface lazyconn.WGIface, connStateDispatcher *peer.ConnectionDispatcher) *Manager {
m := &Manager{
OnDemand: make(chan string, 1),
Idle: make(chan string, 1),
listenerMgr: listener.NewManager(wgIface),
managedPeers: make(map[string]lazyconn.PeerConfig),
idleWatch: make(map[string]*IdleWatch),
excludes: make(map[string]struct{}),
closeChan: make(chan struct{}),
}
connStateListener := &peer.ConnectionListener{
OnConnected: m.onPeerConnected,
OnDisconnected: m.onPeerDisconnected,
}
connStateDispatcher.AddListener(connStateListener)
e.connStateListener = connStateListener
return m
}
func (m *Manager) AddPeer(peer lazyconn.PeerConfig) error {
func (m *Manager) AddPeer(peer lazyconn.PeerConfig) (bool, error) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
log.Debugf("adding lazy peer: %s", peer.PublicKey)
_, exists := m.excludes[peer.PublicKey]
if exists {
return true, nil
}
if _, ok := m.managedPeers[peer.PublicKey]; ok {
log.Warnf("peer already managed: %s", peer.PublicKey)
return nil
return false, nil
}
if err := m.listenerMgr.CreatePeerListener(peer); err != nil {
return err
return false, err
}
m.managedPeers[peer.PublicKey] = peer
return nil
return false, nil
}
func (m *Manager) RemovePeer(peerID string) bool {
@@ -58,11 +84,23 @@ func (m *Manager) RemovePeer(peerID string) bool {
log.Debugf("removing lazy peer: %s", peerID)
if idleWatch, ok := m.idleWatch[peerID]; ok {
idleWatch.Stop()
delete(m.idleWatch, peerID)
}
m.listenerMgr.RemovePeer(peerID)
delete(m.managedPeers, peerID)
return true
}
func (m *Manager) ExcludePeer(peerID string) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
m.excludes[peerID] = struct{}{}
}
// Close the manager and all the listeners
// block until all routine are done and cleanup the exported Channels
func (m *Manager) Close() {
@@ -77,7 +115,7 @@ func (m *Manager) Close() {
m.managedPeers = make(map[string]lazyconn.PeerConfig)
}
func (m *Manager) NextEvent(ctx context.Context) (string, error) {
func (m *Manager) Start(ctx context.Context) (string, error) {
for {
select {
case <-m.closeChan:
@@ -86,7 +124,6 @@ func (m *Manager) NextEvent(ctx context.Context) (string, error) {
return "", ctx.Err()
case e := <-m.listenerMgr.TrafficStartChan:
m.managedPeersMu.Lock()
// todo instead of peer ID check, check by the peer conn instance id
pcfg, ok := m.managedPeers[e.PeerID]
if !ok {
m.managedPeersMu.Unlock()
@@ -98,8 +135,77 @@ func (m *Manager) NextEvent(ctx context.Context) (string, error) {
continue
}
idleWatch := NewIdleWatch()
idleWatch.Start(ctx)
m.idleWatch[e.PeerID] = idleWatch
m.managedPeersMu.Unlock()
return e.PeerID, nil
}
}
}
/*
func (m *Manager) NextOpenEvent(ctx context.Context) (string, error) {
for {
select {
case <-m.closeChan:
return "", fmt.Errorf("service closed")
case <-ctx.Done():
return "", ctx.Err()
case e := <-m.listenerMgr.TrafficStartChan:
m.managedPeersMu.Lock()
pcfg, ok := m.managedPeers[e.PeerID]
if !ok {
m.managedPeersMu.Unlock()
continue
}
if pcfg.PeerConnID != e.PeerConnId {
m.managedPeersMu.Unlock()
continue
}
idleWatch := NewIdleWatch()
idleWatch.Start(ctx)
m.idleWatch[e.PeerID] = idleWatch
m.managedPeersMu.Unlock()
return e.PeerID, nil
}
}
}
*/
func (m *Manager) onPeerConnected(conn *peer.Conn) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
if _, ok := m.excludes[conn.GetKey()]; ok {
return
}
iw, ok := m.idleWatch[conn.GetKey()]
if !ok {
conn.Log.Errorf("idle watch not found for peer")
}
iw.HangUp()
}
func (m *Manager) onPeerDisconnected(conn *peer.Conn) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
if _, ok := m.excludes[conn.GetKey()]; ok {
return
}
iw, ok := m.idleWatch[conn.GetKey()]
if !ok {
conn.Log.Errorf("idle watch not found for peer")
}
iw.Reset()
}

View File

@@ -1,103 +0,0 @@
package watcher
import (
"context"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/internal/lazyconn"
)
const (
checkPeriod = 75 * time.Second // 3 * keep alive time (25s)
expectedMinimumRx = 90 * 2 // 2x keep alive packets
)
type rxHistory struct {
received int64
}
// Watcher checks for peer timeouts
// Todo: this is a naive implementation, we must to finish it
type Watcher struct {
PeerTimedOutChan chan string
wgIface lazyconn.WGIface
peers map[string]*rxHistory
peersMu sync.Mutex
}
func NewWatcher(wgIface lazyconn.WGIface) *Watcher {
return &Watcher{
PeerTimedOutChan: make(chan string, 1),
wgIface: wgIface,
peers: make(map[string]*rxHistory),
}
}
func (m *Watcher) Watch(ctx context.Context) {
timer := time.NewTimer(checkPeriod)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
stats, err := m.wgIface.GetStats()
if err != nil {
log.Errorf("failed to get peer stats: %s", err)
continue
}
m.checkTimeouts(ctx, stats)
}
}
}
func (m *Watcher) AddPeer(peerID string) {
m.peersMu.Lock()
defer m.peersMu.Unlock()
m.peers[peerID] = &rxHistory{}
}
func (m *Watcher) RemovePeer(id string) {
m.peersMu.Lock()
defer m.peersMu.Unlock()
delete(m.peers, id)
}
func (m *Watcher) checkTimeouts(ctx context.Context, allPeersStats map[string]configurer.WGStats) {
m.peersMu.Lock()
defer m.peersMu.Unlock()
for p, rxh := range m.peers {
s, ok := allPeersStats[p]
if !ok {
log.Warnf("no stats for peer %s", p)
}
// received bytes since last check
received := s.RxBytes - rxh.received
if received >= expectedMinimumRx {
rxh.received = s.RxBytes
continue
}
// todo handle that case when swtich from P2P to Relay and the endpoint has been reseted.
// peer timed out
delete(m.peers, p)
select {
case <-ctx.Done():
return
case m.PeerTimedOutChan <- p:
}
}
}

View File

@@ -0,0 +1,59 @@
package peer
import (
"sync"
)
/*
handler := peer.ConnectionListener{
OnConnected: m.onPeerConnected,
OnDisconnected: m.onPeerDisconnected,
}
dispatcher.AddListener(handler)
*/
type ConnectionListener struct {
OnConnected func(peer *Conn)
OnDisconnected func(peer *Conn)
}
type ConnectionDispatcher struct {
listeners map[*ConnectionListener]struct{}
mu sync.Mutex
}
func NewConnectionDispatcher() *ConnectionDispatcher {
return &ConnectionDispatcher{
listeners: make(map[*ConnectionListener]struct{}),
}
}
func (e *ConnectionDispatcher) AddListener(listener *ConnectionListener) {
e.mu.Lock()
defer e.mu.Unlock()
e.listeners[listener] = struct{}{}
}
func (e *ConnectionDispatcher) RemoveListener(listener *ConnectionListener) {
e.mu.Lock()
defer e.mu.Unlock()
delete(e.listeners, listener)
}
func (e *ConnectionDispatcher) NotifyConnected(peer *Conn) {
e.mu.Lock()
defer e.mu.Unlock()
for listener, _ := range e.listeners {
listener.OnConnected(peer)
}
}
func (e *ConnectionDispatcher) NotifyDisconnected(peer *Conn) {
e.mu.Lock()
defer e.mu.Unlock()
for listener, _ := range e.listeners {
listener.OnDisconnected(peer)
}
}

View File

@@ -91,6 +91,17 @@ func (s *Store) PeerConnOpen(pubKey string) {
p.Open()
}
func (s *Store) PeerConnClose(pubKey string) {
s.peerConnsMu.RLock()
defer s.peerConnsMu.RUnlock()
p, ok := s.peerConns[pubKey]
if !ok {
return
}
p.Close()
}
func (s *Store) PeersPubKey() []string {
s.peerConnsMu.RLock()
defer s.peerConnsMu.RUnlock()