Fix deprecated event handling

This commit is contained in:
Zoltán Papp
2025-02-28 12:24:44 +01:00
parent 4a5edc1374
commit 4e33582aaa
5 changed files with 55 additions and 56 deletions

View File

@@ -55,11 +55,7 @@ func (e *ConnMgr) Start(parentCtx context.Context) {
ctx, cancel := context.WithCancel(parentCtx) ctx, cancel := context.WithCancel(parentCtx)
e.ctxCancel = cancel e.ctxCancel = cancel
e.wg.Add(2) e.wg.Add(1)
go func() {
defer e.wg.Done()
e.lazyConnMgr.Start(ctx)
}()
go func() { go func() {
defer e.wg.Done() defer e.wg.Done()
e.receiveLazyConnEvents(ctx) e.receiveLazyConnEvents(ctx)
@@ -89,6 +85,7 @@ func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) {
lazyPeerCfg := lazyconn.PeerConfig{ lazyPeerCfg := lazyconn.PeerConfig{
PublicKey: peerKey, PublicKey: peerKey,
AllowedIPs: conn.WgConfig().AllowedIps, AllowedIPs: conn.WgConfig().AllowedIps,
PeerConnID: conn.ConnID(),
} }
if err := e.lazyConnMgr.AddPeer(lazyPeerCfg); err != nil { if err := e.lazyConnMgr.AddPeer(lazyPeerCfg); err != nil {
log.Errorf("failed to add peer to lazyconn manager: %v", err) log.Errorf("failed to add peer to lazyconn manager: %v", err)
@@ -142,11 +139,11 @@ func (e *ConnMgr) Close() {
func (e *ConnMgr) receiveLazyConnEvents(ctx context.Context) { func (e *ConnMgr) receiveLazyConnEvents(ctx context.Context) {
for { for {
select { peerID, err := e.lazyConnMgr.NextEvent(ctx)
case <-ctx.Done(): if err != nil {
log.Infof("lazy connection manager closed: %v", err)
return return
case peerID := <-e.lazyConnMgr.PeerActivityChan:
e.peerStore.PeerConnOpen(peerID)
} }
e.peerStore.PeerConnOpen(peerID)
} }
} }

View File

@@ -7,10 +7,16 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/lazyconn" "github.com/netbirdio/netbird/client/internal/lazyconn"
"github.com/netbirdio/netbird/client/internal/peer"
) )
type OnDemandEvent struct {
PeerID string
PeerConnId peer.ConnID
}
type Manager struct { type Manager struct {
TrafficStartChan chan string TrafficStartChan chan OnDemandEvent
wgIface lazyconn.WGIface wgIface lazyconn.WGIface
@@ -23,7 +29,7 @@ type Manager struct {
func NewManager(wgIface lazyconn.WGIface) *Manager { func NewManager(wgIface lazyconn.WGIface) *Manager {
m := &Manager{ m := &Manager{
TrafficStartChan: make(chan string, 1), TrafficStartChan: make(chan OnDemandEvent, 1),
wgIface: wgIface, wgIface: wgIface,
portGenerator: newPortAllocator(), portGenerator: newPortAllocator(),
peers: make(map[string]*Listener), peers: make(map[string]*Listener),
@@ -52,7 +58,7 @@ func (m *Manager) CreatePeerListener(peerCfg lazyconn.PeerConfig) error {
m.peers[peerCfg.PublicKey] = listener m.peers[peerCfg.PublicKey] = listener
log.Infof("created on-demand listener: %s, for peer: %s", addr.String(), peerCfg.PublicKey) log.Infof("created on-demand listener: %s, for peer: %s", addr.String(), peerCfg.PublicKey)
go m.waitForTraffic(listener, peerCfg.PublicKey) go m.waitForTraffic(listener, peerCfg.PublicKey, peerCfg.PeerConnID)
log.Debugf("created lazy connection listener for: %s", peerCfg.PublicKey) log.Debugf("created lazy connection listener for: %s", peerCfg.PublicKey)
return nil return nil
@@ -79,9 +85,10 @@ func (m *Manager) Close() {
listener.Close() listener.Close()
delete(m.peers, peerID) delete(m.peers, peerID)
} }
// todo drain TrafficStartChan
} }
func (m *Manager) waitForTraffic(listener *Listener, peerID string) { func (m *Manager) waitForTraffic(listener *Listener, peerID string, peerConnID peer.ConnID) {
listener.ReadPackets() listener.ReadPackets()
m.mu.Lock() m.mu.Lock()
@@ -91,19 +98,13 @@ func (m *Manager) waitForTraffic(listener *Listener, peerID string) {
delete(m.peers, peerID) delete(m.peers, peerID)
m.mu.Unlock() m.mu.Unlock()
m.notify(peerID) m.notify(OnDemandEvent{PeerID: peerID, PeerConnId: peerConnID})
} }
// todo: cause issue in this scenario func (m *Manager) notify(event OnDemandEvent) {
// - notify peerID to TrafficStartChan log.Debugf("peer started to send traffic: %s", event.PeerID)
// - do not read the upper layer yet the event
// - RemovePeer(peerID string)
// at this moment we expect to never receive the event for peerID
// - read from TrafficStartChan and the event will be there
func (m *Manager) notify(peerID string) {
log.Debugf("peer started to send traffic, remove lazy endpoint: %s", peerID)
select { select {
case <-m.done: case <-m.done:
case m.TrafficStartChan <- peerID: case m.TrafficStartChan <- event:
} }
} }

View File

@@ -2,6 +2,7 @@ package manager
import ( import (
"context" "context"
"fmt"
"sync" "sync"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -13,31 +14,20 @@ import (
// Manager manages lazy connections // Manager manages lazy connections
// This is not a thread safe implementation, do not call exported functions concurrently // This is not a thread safe implementation, do not call exported functions concurrently
type Manager struct { type Manager struct {
PeerActivityChan chan string
listenerMgr *listener.Manager listenerMgr *listener.Manager
managedPeers map[string]lazyconn.PeerConfig managedPeers map[string]lazyconn.PeerConfig
managedPeersMu sync.Mutex managedPeersMu sync.Mutex
closeChan chan struct{}
ctxCancel context.CancelFunc
} }
func NewManager(wgIface lazyconn.WGIface) *Manager { func NewManager(wgIface lazyconn.WGIface) *Manager {
m := &Manager{ m := &Manager{
PeerActivityChan: make(chan string, 1), listenerMgr: listener.NewManager(wgIface),
listenerMgr: listener.NewManager(wgIface), managedPeers: make(map[string]lazyconn.PeerConfig),
managedPeers: make(map[string]lazyconn.PeerConfig),
} }
return m return m
} }
// Start to listen for traffic start events
func (m *Manager) Start(parentCtx context.Context) {
ctx, cancel := context.WithCancel(parentCtx)
m.ctxCancel = cancel
m.receiveLazyConnEvents(ctx)
}
func (m *Manager) AddPeer(peer lazyconn.PeerConfig) error { func (m *Manager) AddPeer(peer lazyconn.PeerConfig) error {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
@@ -78,36 +68,37 @@ func (m *Manager) Close() {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
m.ctxCancel() // todo prevent double call
close(m.closeChan)
m.listenerMgr.Close() m.listenerMgr.Close()
m.managedPeers = make(map[string]lazyconn.PeerConfig) m.managedPeers = make(map[string]lazyconn.PeerConfig)
// clean up the channel for the future reuse
m.drainPeerActivityChan()
} }
func (m *Manager) receiveLazyConnEvents(ctx context.Context) { func (m *Manager) NextEvent(ctx context.Context) (string, error) {
for { for {
select { select {
case <-m.closeChan:
return "", fmt.Errorf("service closed")
case <-ctx.Done(): case <-ctx.Done():
return return "", ctx.Err()
case peerID := <-m.listenerMgr.TrafficStartChan: case e := <-m.listenerMgr.TrafficStartChan:
select { m.managedPeersMu.Lock()
case <-ctx.Done(): // todo instead of peer ID check, check by the peer conn instance id
case m.PeerActivityChan <- peerID: pcfg, ok := m.managedPeers[e.PeerID]
if !ok {
m.managedPeersMu.Unlock()
continue
} }
}
}
}
func (m *Manager) drainPeerActivityChan() { if pcfg.PeerConnID != e.PeerConnId {
for { m.managedPeersMu.Unlock()
select { continue
case <-m.PeerActivityChan: }
default:
return m.managedPeersMu.Unlock()
return e.PeerID, nil
} }
} }
} }

View File

@@ -2,9 +2,12 @@ package lazyconn
import ( import (
"net/netip" "net/netip"
"github.com/netbirdio/netbird/client/internal/peer"
) )
type PeerConfig struct { type PeerConfig struct {
PublicKey string PublicKey string
AllowedIPs []netip.Prefix AllowedIPs []netip.Prefix
PeerConnID peer.ConnID
} }

View File

@@ -10,6 +10,7 @@ import (
"runtime" "runtime"
"sync" "sync"
"time" "time"
"unsafe"
"github.com/pion/ice/v3" "github.com/pion/ice/v3"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@@ -26,6 +27,8 @@ import (
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group" semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
) )
type ConnID unsafe.Pointer
type ConnPriority int type ConnPriority int
func (cp ConnPriority) String() string { func (cp ConnPriority) String() string {
@@ -297,6 +300,10 @@ func (conn *Conn) GetKey() string {
return conn.config.Key return conn.config.Key
} }
func (conn *Conn) ConnID() ConnID {
return ConnID(conn)
}
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected // configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) { func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
conn.mu.Lock() conn.mu.Lock()