diff --git a/client/internal/conn_mgr.go b/client/internal/conn_mgr.go index c56dd3b01..6fb56a93c 100644 --- a/client/internal/conn_mgr.go +++ b/client/internal/conn_mgr.go @@ -55,11 +55,7 @@ func (e *ConnMgr) Start(parentCtx context.Context) { ctx, cancel := context.WithCancel(parentCtx) e.ctxCancel = cancel - e.wg.Add(2) - go func() { - defer e.wg.Done() - e.lazyConnMgr.Start(ctx) - }() + e.wg.Add(1) go func() { defer e.wg.Done() e.receiveLazyConnEvents(ctx) @@ -89,6 +85,7 @@ func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) { lazyPeerCfg := lazyconn.PeerConfig{ PublicKey: peerKey, AllowedIPs: conn.WgConfig().AllowedIps, + PeerConnID: conn.ConnID(), } if err := e.lazyConnMgr.AddPeer(lazyPeerCfg); err != nil { log.Errorf("failed to add peer to lazyconn manager: %v", err) @@ -142,11 +139,11 @@ func (e *ConnMgr) Close() { func (e *ConnMgr) receiveLazyConnEvents(ctx context.Context) { for { - select { - case <-ctx.Done(): + peerID, err := e.lazyConnMgr.NextEvent(ctx) + if err != nil { + log.Infof("lazy connection manager closed: %v", err) return - case peerID := <-e.lazyConnMgr.PeerActivityChan: - e.peerStore.PeerConnOpen(peerID) } + e.peerStore.PeerConnOpen(peerID) } } diff --git a/client/internal/lazyconn/listener/manager.go b/client/internal/lazyconn/listener/manager.go index 58ead3d24..ef6d5f215 100644 --- a/client/internal/lazyconn/listener/manager.go +++ b/client/internal/lazyconn/listener/manager.go @@ -7,10 +7,16 @@ import ( log "github.com/sirupsen/logrus" "github.com/netbirdio/netbird/client/internal/lazyconn" + "github.com/netbirdio/netbird/client/internal/peer" ) +type OnDemandEvent struct { + PeerID string + PeerConnId peer.ConnID +} + type Manager struct { - TrafficStartChan chan string + TrafficStartChan chan OnDemandEvent wgIface lazyconn.WGIface @@ -23,7 +29,7 @@ type Manager struct { func NewManager(wgIface lazyconn.WGIface) *Manager { m := &Manager{ - TrafficStartChan: make(chan string, 1), + TrafficStartChan: make(chan OnDemandEvent, 1), wgIface: wgIface, portGenerator: newPortAllocator(), peers: make(map[string]*Listener), @@ -52,7 +58,7 @@ func (m *Manager) CreatePeerListener(peerCfg lazyconn.PeerConfig) error { m.peers[peerCfg.PublicKey] = listener log.Infof("created on-demand listener: %s, for peer: %s", addr.String(), peerCfg.PublicKey) - go m.waitForTraffic(listener, peerCfg.PublicKey) + go m.waitForTraffic(listener, peerCfg.PublicKey, peerCfg.PeerConnID) log.Debugf("created lazy connection listener for: %s", peerCfg.PublicKey) return nil @@ -79,9 +85,10 @@ func (m *Manager) Close() { listener.Close() delete(m.peers, peerID) } + // todo drain TrafficStartChan } -func (m *Manager) waitForTraffic(listener *Listener, peerID string) { +func (m *Manager) waitForTraffic(listener *Listener, peerID string, peerConnID peer.ConnID) { listener.ReadPackets() m.mu.Lock() @@ -91,19 +98,13 @@ func (m *Manager) waitForTraffic(listener *Listener, peerID string) { delete(m.peers, peerID) m.mu.Unlock() - m.notify(peerID) + m.notify(OnDemandEvent{PeerID: peerID, PeerConnId: peerConnID}) } -// todo: cause issue in this scenario -// - notify peerID to TrafficStartChan -// - do not read the upper layer yet the event -// - RemovePeer(peerID string) -// at this moment we expect to never receive the event for peerID -// - read from TrafficStartChan and the event will be there -func (m *Manager) notify(peerID string) { - log.Debugf("peer started to send traffic, remove lazy endpoint: %s", peerID) +func (m *Manager) notify(event OnDemandEvent) { + log.Debugf("peer started to send traffic: %s", event.PeerID) select { case <-m.done: - case m.TrafficStartChan <- peerID: + case m.TrafficStartChan <- event: } } diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go index a3671711f..2b111dfef 100644 --- a/client/internal/lazyconn/manager/manager.go +++ b/client/internal/lazyconn/manager/manager.go @@ -2,6 +2,7 @@ package manager import ( "context" + "fmt" "sync" log "github.com/sirupsen/logrus" @@ -13,31 +14,20 @@ import ( // Manager manages lazy connections // This is not a thread safe implementation, do not call exported functions concurrently type Manager struct { - PeerActivityChan chan string - listenerMgr *listener.Manager managedPeers map[string]lazyconn.PeerConfig managedPeersMu sync.Mutex - - ctxCancel context.CancelFunc + closeChan chan struct{} } func NewManager(wgIface lazyconn.WGIface) *Manager { m := &Manager{ - PeerActivityChan: make(chan string, 1), - listenerMgr: listener.NewManager(wgIface), - managedPeers: make(map[string]lazyconn.PeerConfig), + listenerMgr: listener.NewManager(wgIface), + managedPeers: make(map[string]lazyconn.PeerConfig), } return m } -// Start to listen for traffic start events -func (m *Manager) Start(parentCtx context.Context) { - ctx, cancel := context.WithCancel(parentCtx) - m.ctxCancel = cancel - m.receiveLazyConnEvents(ctx) -} - func (m *Manager) AddPeer(peer lazyconn.PeerConfig) error { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -78,36 +68,37 @@ func (m *Manager) Close() { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() - m.ctxCancel() + // todo prevent double call + close(m.closeChan) m.listenerMgr.Close() m.managedPeers = make(map[string]lazyconn.PeerConfig) - - // clean up the channel for the future reuse - m.drainPeerActivityChan() } -func (m *Manager) receiveLazyConnEvents(ctx context.Context) { +func (m *Manager) NextEvent(ctx context.Context) (string, error) { for { select { + case <-m.closeChan: + return "", fmt.Errorf("service closed") case <-ctx.Done(): - return - case peerID := <-m.listenerMgr.TrafficStartChan: - select { - case <-ctx.Done(): - case m.PeerActivityChan <- peerID: + return "", ctx.Err() + case e := <-m.listenerMgr.TrafficStartChan: + m.managedPeersMu.Lock() + // todo instead of peer ID check, check by the peer conn instance id + pcfg, ok := m.managedPeers[e.PeerID] + if !ok { + m.managedPeersMu.Unlock() + continue } - } - } -} -func (m *Manager) drainPeerActivityChan() { - for { - select { - case <-m.PeerActivityChan: - default: - return + if pcfg.PeerConnID != e.PeerConnId { + m.managedPeersMu.Unlock() + continue + } + + m.managedPeersMu.Unlock() + return e.PeerID, nil } } } diff --git a/client/internal/lazyconn/peercfg.go b/client/internal/lazyconn/peercfg.go index 8c45ed357..56540c956 100644 --- a/client/internal/lazyconn/peercfg.go +++ b/client/internal/lazyconn/peercfg.go @@ -2,9 +2,12 @@ package lazyconn import ( "net/netip" + + "github.com/netbirdio/netbird/client/internal/peer" ) type PeerConfig struct { PublicKey string AllowedIPs []netip.Prefix + PeerConnID peer.ConnID } diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index f1d81f21a..6084c3629 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -10,6 +10,7 @@ import ( "runtime" "sync" "time" + "unsafe" "github.com/pion/ice/v3" log "github.com/sirupsen/logrus" @@ -26,6 +27,8 @@ import ( semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group" ) +type ConnID unsafe.Pointer + type ConnPriority int func (cp ConnPriority) String() string { @@ -297,6 +300,10 @@ func (conn *Conn) GetKey() string { return conn.config.Key } +func (conn *Conn) ConnID() ConnID { + return ConnID(conn) +} + // configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) { conn.mu.Lock()