diff --git a/client/internal/conn_mgr.go b/client/internal/conn_mgr.go index d3f096f63..c56dd3b01 100644 --- a/client/internal/conn_mgr.go +++ b/client/internal/conn_mgr.go @@ -55,9 +55,15 @@ func (e *ConnMgr) Start(parentCtx context.Context) { ctx, cancel := context.WithCancel(parentCtx) e.ctxCancel = cancel - e.lazyConnMgr.Start(ctx) - e.wg.Add(1) - go e.receiveLazyConnEvents(ctx) + e.wg.Add(2) + go func() { + defer e.wg.Done() + e.lazyConnMgr.Start(ctx) + }() + go func() { + defer e.wg.Done() + e.receiveLazyConnEvents(ctx) + }() } func (e *ConnMgr) AddExcludeFromLazyConnection(peerID string) { @@ -135,7 +141,6 @@ func (e *ConnMgr) Close() { } func (e *ConnMgr) receiveLazyConnEvents(ctx context.Context) { - defer e.wg.Done() for { select { case <-ctx.Done(): diff --git a/client/internal/lazyconn/listener/listener.go b/client/internal/lazyconn/listener/listener.go index 9db3d50be..c978b6ba7 100644 --- a/client/internal/lazyconn/listener/listener.go +++ b/client/internal/lazyconn/listener/listener.go @@ -2,7 +2,6 @@ package listener import ( "net" - "sync" log "github.com/sirupsen/logrus" ) @@ -11,8 +10,6 @@ type Listener struct { peerID string conn *net.UDPConn - // todo is not thread safe. If you start the ReadPackets in upper layer in a Go thread then wait for Close() there too - wg sync.WaitGroup } func NewListener(peerID string, conn *net.UDPConn) *Listener { @@ -24,14 +21,11 @@ func NewListener(peerID string, conn *net.UDPConn) *Listener { } func (d *Listener) ReadPackets(trigger func(peerID string)) { - d.wg.Add(1) - defer d.wg.Done() - for { buffer := make([]byte, 10) n, remoteAddr, err := d.conn.ReadFromUDP(buffer) if err != nil { - log.Infof("exit from fake peer reader: %v", err) + log.Infof("exit from peer listener reader: %v", err) return } @@ -47,5 +41,4 @@ func (d *Listener) Close() { if err := d.conn.Close(); err != nil { log.Errorf("failed to close UDP listener: %s", err) } - d.wg.Wait() } diff --git a/client/internal/lazyconn/listener/manager.go b/client/internal/lazyconn/listener/manager.go index d61e9ef2f..3a3fe6219 100644 --- a/client/internal/lazyconn/listener/manager.go +++ b/client/internal/lazyconn/listener/manager.go @@ -3,6 +3,7 @@ package listener import ( "fmt" "net" + "sync" log "github.com/sirupsen/logrus" @@ -17,6 +18,7 @@ type Manager struct { portGenerator *portAllocator // todo peers add/remove is not thread safe because of the callback function peers map[string]*Listener + wg sync.WaitGroup done chan struct{} } @@ -31,12 +33,12 @@ func NewManager(wgIface lazyconn.WGIface) *Manager { return m } -func (m *Manager) CreateFakePeer(peerCfg lazyconn.PeerConfig) error { +func (m *Manager) CreatePeerListener(peerCfg lazyconn.PeerConfig) error { if _, ok := m.peers[peerCfg.PublicKey]; ok { return nil } - if err := m.createFakePeer(peerCfg); err != nil { + if err := m.createPeerlistener(peerCfg); err != nil { return err } log.Debugf("created lazy connection listener for: %s", peerCfg.PublicKey) @@ -51,9 +53,7 @@ func (m *Manager) RemovePeer(peerID string) { listener.Close() - if err := m.wgIface.RemovePeer(peerID); err != nil { - log.Warnf("failed to remove fake peer: %v", err) - } + m.removeEndpoint(peerID) delete(m.peers, peerID) } @@ -64,9 +64,10 @@ func (m *Manager) Close() { listener.Close() delete(m.peers, peerID) } + m.wg.Wait() } -func (m *Manager) createFakePeer(peerCfg lazyconn.PeerConfig) error { +func (m *Manager) createPeerlistener(peerCfg lazyconn.PeerConfig) error { conn, addr, err := m.portGenerator.newConn() if err != nil { return fmt.Errorf("failed to bind lazy connection: %v", err) @@ -82,7 +83,11 @@ func (m *Manager) createFakePeer(peerCfg lazyconn.PeerConfig) error { log.Infof("created on-demand listener: %s, for peer: %s", addr.String(), peerCfg.PublicKey) - go listener.ReadPackets(m.onTrigger) + m.wg.Add(1) + go func() { + defer m.wg.Done() + listener.ReadPackets(m.onTrigger) + }() m.peers[peerCfg.PublicKey] = listener return nil @@ -91,10 +96,7 @@ func (m *Manager) createFakePeer(peerCfg lazyconn.PeerConfig) error { // todo: it is not thread safe, but it is ok if we protect from upper layer func (m *Manager) onTrigger(peerID string) { log.Debugf("peer started to send traffic, remove lazy endpoint: %s", peerID) - if err := m.wgIface.RemovePeer(peerID); err != nil { - log.Warnf("failed to remove fake peer: %v", err) - } - + m.removeEndpoint(peerID) delete(m.peers, peerID) select { @@ -103,6 +105,12 @@ func (m *Manager) onTrigger(peerID string) { } } +func (m *Manager) removeEndpoint(peerID string) { + if err := m.wgIface.RemovePeer(peerID); err != nil { + log.Warnf("failed to remove peer listener: %v", err) + } +} + func (m *Manager) createEndpoint(peerCfg lazyconn.PeerConfig, endpoint *net.UDPAddr) error { return m.wgIface.UpdatePeer(peerCfg.PublicKey, peerCfg.AllowedIPs, 0, endpoint, nil) } diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go index e6109e65b..a3671711f 100644 --- a/client/internal/lazyconn/manager/manager.go +++ b/client/internal/lazyconn/manager/manager.go @@ -20,7 +20,6 @@ type Manager struct { managedPeersMu sync.Mutex ctxCancel context.CancelFunc - wg sync.WaitGroup } func NewManager(wgIface lazyconn.WGIface) *Manager { @@ -36,14 +35,7 @@ func NewManager(wgIface lazyconn.WGIface) *Manager { func (m *Manager) Start(parentCtx context.Context) { ctx, cancel := context.WithCancel(parentCtx) m.ctxCancel = cancel - - m.wg.Add(1) - - go func() { - defer m.wg.Done() - defer cancel() - m.receiveLazyConnEvents(ctx) - }() + m.receiveLazyConnEvents(ctx) } func (m *Manager) AddPeer(peer lazyconn.PeerConfig) error { @@ -57,7 +49,7 @@ func (m *Manager) AddPeer(peer lazyconn.PeerConfig) error { return nil } - if err := m.listenerMgr.CreateFakePeer(peer); err != nil { + if err := m.listenerMgr.CreatePeerListener(peer); err != nil { return err } @@ -89,7 +81,7 @@ func (m *Manager) Close() { m.ctxCancel() m.listenerMgr.Close() - m.wg.Wait() + m.managedPeers = make(map[string]lazyconn.PeerConfig) // clean up the channel for the future reuse @@ -102,18 +94,14 @@ func (m *Manager) receiveLazyConnEvents(ctx context.Context) { case <-ctx.Done(): return case peerID := <-m.listenerMgr.TrafficStartChan: - m.notifyPeerAction(ctx, peerID) + select { + case <-ctx.Done(): + case m.PeerActivityChan <- peerID: + } } } } -func (m *Manager) notifyPeerAction(ctx context.Context, peerID string) { - select { - case <-ctx.Done(): - case m.PeerActivityChan <- peerID: - } -} - func (m *Manager) drainPeerActivityChan() { for { select {