diff --git a/client/internal/lazyconn/listener/listener.go b/client/internal/lazyconn/listener/listener.go index c978b6ba7..77486c29e 100644 --- a/client/internal/lazyconn/listener/listener.go +++ b/client/internal/lazyconn/listener/listener.go @@ -2,43 +2,68 @@ package listener import ( "net" + "sync" log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/client/internal/lazyconn" ) type Listener struct { - peerID string - - conn *net.UDPConn + wgIface lazyconn.WGIface + peerCfg lazyconn.PeerConfig + conn *net.UDPConn + endpoint *net.UDPAddr + done sync.Mutex } -func NewListener(peerID string, conn *net.UDPConn) *Listener { +func NewListener(wgIface lazyconn.WGIface, cfg lazyconn.PeerConfig, conn *net.UDPConn, addr *net.UDPAddr) (*Listener, error) { d := &Listener{ - conn: conn, - peerID: peerID, + wgIface: wgIface, + peerCfg: cfg, + conn: conn, + endpoint: addr, } - return d + if err := d.createEndpoint(); err != nil { + return nil, err + } + d.done.Lock() + return d, nil } -func (d *Listener) ReadPackets(trigger func(peerID string)) { +func (d *Listener) ReadPackets() { for { buffer := make([]byte, 10) n, remoteAddr, err := d.conn.ReadFromUDP(buffer) if err != nil { log.Infof("exit from peer listener reader: %v", err) - return + break } if n < 4 { log.Warnf("received %d bytes from %s, too short", n, remoteAddr) continue } - trigger(d.peerID) + break } + + d.removeEndpoint() + d.done.Unlock() } func (d *Listener) Close() { if err := d.conn.Close(); err != nil { log.Errorf("failed to close UDP listener: %s", err) } + d.done.Lock() +} + +func (d *Listener) removeEndpoint() { + if err := d.wgIface.RemovePeer(d.peerCfg.PublicKey); err != nil { + log.Warnf("failed to remove peer listener: %v", err) + } +} + +func (d *Listener) createEndpoint() error { + return d.wgIface.UpdatePeer(d.peerCfg.PublicKey, d.peerCfg.AllowedIPs, 0, d.endpoint, nil) } diff --git a/client/internal/lazyconn/listener/manager.go b/client/internal/lazyconn/listener/manager.go index 3a3fe6219..58ead3d24 100644 --- a/client/internal/lazyconn/listener/manager.go +++ b/client/internal/lazyconn/listener/manager.go @@ -2,7 +2,6 @@ package listener import ( "fmt" - "net" "sync" log "github.com/sirupsen/logrus" @@ -16,10 +15,10 @@ type Manager struct { wgIface lazyconn.WGIface portGenerator *portAllocator - // todo peers add/remove is not thread safe because of the callback function - peers map[string]*Listener - wg sync.WaitGroup - done chan struct{} + peers map[string]*Listener + done chan struct{} + + mu sync.Mutex } func NewManager(wgIface lazyconn.WGIface) *Manager { @@ -34,83 +33,77 @@ func NewManager(wgIface lazyconn.WGIface) *Manager { } func (m *Manager) CreatePeerListener(peerCfg lazyconn.PeerConfig) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.peers[peerCfg.PublicKey]; ok { return nil } - if err := m.createPeerlistener(peerCfg); err != nil { - return err - } - log.Debugf("created lazy connection listener for: %s", peerCfg.PublicKey) - return nil -} - -func (m *Manager) RemovePeer(peerID string) { - listener, ok := m.peers[peerID] - if !ok { - return - } - - listener.Close() - - m.removeEndpoint(peerID) - - delete(m.peers, peerID) -} - -func (m *Manager) Close() { - close(m.done) - for peerID, listener := range m.peers { - listener.Close() - delete(m.peers, peerID) - } - m.wg.Wait() -} - -func (m *Manager) createPeerlistener(peerCfg lazyconn.PeerConfig) error { conn, addr, err := m.portGenerator.newConn() if err != nil { return fmt.Errorf("failed to bind lazy connection: %v", err) } - listener := NewListener(peerCfg.PublicKey, conn) - - if err := m.createEndpoint(peerCfg, addr); err != nil { - log.Errorf("failed to create endpoint for %s: %v", peerCfg.PublicKey, err) - listener.Close() + listener, err := NewListener(m.wgIface, peerCfg, conn, addr) + if err != nil { return err } + m.peers[peerCfg.PublicKey] = listener log.Infof("created on-demand listener: %s, for peer: %s", addr.String(), peerCfg.PublicKey) + go m.waitForTraffic(listener, peerCfg.PublicKey) - m.wg.Add(1) - go func() { - defer m.wg.Done() - listener.ReadPackets(m.onTrigger) - }() - - m.peers[peerCfg.PublicKey] = listener + log.Debugf("created lazy connection listener for: %s", peerCfg.PublicKey) return nil } -// todo: it is not thread safe, but it is ok if we protect from upper layer -func (m *Manager) onTrigger(peerID string) { - log.Debugf("peer started to send traffic, remove lazy endpoint: %s", peerID) - m.removeEndpoint(peerID) +func (m *Manager) RemovePeer(peerID string) { + m.mu.Lock() + listener, ok := m.peers[peerID] + if !ok { + m.mu.Unlock() + return + } delete(m.peers, peerID) + listener.Close() + m.mu.Unlock() +} +func (m *Manager) Close() { + m.mu.Lock() + defer m.mu.Unlock() + + close(m.done) + for peerID, listener := range m.peers { + listener.Close() + delete(m.peers, peerID) + } +} + +func (m *Manager) waitForTraffic(listener *Listener, peerID string) { + listener.ReadPackets() + + m.mu.Lock() + if _, ok := m.peers[peerID]; !ok { + return + } + delete(m.peers, peerID) + m.mu.Unlock() + + m.notify(peerID) +} + +// todo: cause issue in this scenario +// - notify peerID to TrafficStartChan +// - do not read the upper layer yet the event +// - RemovePeer(peerID string) +// at this moment we expect to never receive the event for peerID +// - read from TrafficStartChan and the event will be there +func (m *Manager) notify(peerID string) { + log.Debugf("peer started to send traffic, remove lazy endpoint: %s", peerID) select { case <-m.done: case m.TrafficStartChan <- peerID: } } - -func (m *Manager) removeEndpoint(peerID string) { - if err := m.wgIface.RemovePeer(peerID); err != nil { - log.Warnf("failed to remove peer listener: %v", err) - } -} - -func (m *Manager) createEndpoint(peerCfg lazyconn.PeerConfig, endpoint *net.UDPAddr) error { - return m.wgIface.UpdatePeer(peerCfg.PublicKey, peerCfg.AllowedIPs, 0, endpoint, nil) -}