diff --git a/client/internal/conn_mgr.go b/client/internal/conn_mgr.go index 0cb8d90c9..8cac31622 100644 --- a/client/internal/conn_mgr.go +++ b/client/internal/conn_mgr.go @@ -8,7 +8,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/netbirdio/netbird/client/internal/lazyconn" - lazyConnManager "github.com/netbirdio/netbird/client/internal/lazyconn/manager" + "github.com/netbirdio/netbird/client/internal/lazyconn/manager" "github.com/netbirdio/netbird/client/internal/peer" "github.com/netbirdio/netbird/client/internal/peerstore" ) @@ -25,7 +25,7 @@ const ( // - Handling connection establishment based on peer signaling type ConnMgr struct { peerStore *peerstore.Store - lazyConnMgr *lazyConnManager.Manager + lazyConnMgr *manager.Manager connStateListener *peer.ConnectionListener @@ -34,9 +34,9 @@ type ConnMgr struct { } func NewConnMgr(peerStore *peerstore.Store, iface lazyconn.WGIface, dispatcher *peer.ConnectionDispatcher) *ConnMgr { - var lazyConnMgr *lazyConnManager.Manager + var lazyConnMgr *manager.Manager if os.Getenv(envDisableLazyConn) != "true" { - lazyConnMgr = lazyConnManager.NewManager(iface, dispatcher) + lazyConnMgr = manager.NewManager(iface, dispatcher) } e := &ConnMgr{ @@ -57,6 +57,12 @@ func (e *ConnMgr) Start(parentCtx context.Context) { e.wg.Add(1) go e.receiveLazyEvents(ctx) + + e.wg.Add(1) + go func() { + defer e.wg.Done() + e.lazyConnMgr.Start(ctx) + }() } func (e *ConnMgr) AddExcludeFromLazyConnection(peerID string) { @@ -68,7 +74,7 @@ func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) { return true } - if !e.isStarted() { + if !e.isStartedWithLazyMgr() { conn.Open() return } @@ -77,6 +83,7 @@ func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) { PublicKey: peerKey, AllowedIPs: conn.WgConfig().AllowedIps, PeerConnID: conn.ConnID(), + Log: conn.Log, } excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg) if err != nil { @@ -101,12 +108,11 @@ func (e *ConnMgr) OnSignalMsg(peerKey string) (*peer.Conn, bool) { return nil, false } - if !e.isStarted() { + if !e.isStartedWithLazyMgr() { return conn, true } - if ok := e.lazyConnMgr.RemovePeer(peerKey); ok { - conn.Log.Infof("removed peer from lazy conn manager") + if found := e.lazyConnMgr.RunIdleWatch(peerKey); found { conn.Open() } return conn, true @@ -119,7 +125,7 @@ func (e *ConnMgr) RemovePeerConn(peerKey string) { } defer conn.Close() - if !e.isStarted() { + if !e.isStartedWithLazyMgr() { return } @@ -128,7 +134,7 @@ func (e *ConnMgr) RemovePeerConn(peerKey string) { } func (e *ConnMgr) Close() { - if !e.isStarted() { + if !e.isStartedWithLazyMgr() { return } @@ -147,11 +153,13 @@ func (e *ConnMgr) receiveLazyEvents(ctx context.Context) { case peerID := <-e.lazyConnMgr.OnDemand: e.peerStore.PeerConnOpen(peerID) case peerID := <-e.lazyConnMgr.Idle: + // todo consider to use engine lock e.peerStore.PeerConnClose(peerID) + e.lazyConnMgr.RunOnDemandListener(peerID) } } } -func (e *ConnMgr) isStarted() bool { +func (e *ConnMgr) isStartedWithLazyMgr() bool { return e.lazyConnMgr != nil && e.ctxCancel != nil } diff --git a/client/internal/engine.go b/client/internal/engine.go index 8f173fb55..fc71e392b 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -169,7 +169,8 @@ type Engine struct { sshServerFunc func(hostKeyPEM []byte, addr string) (nbssh.Server, error) sshServer nbssh.Server - statusRecorder *peer.Status + statusRecorder *peer.Status + peerConnDispatcher *peer.ConnectionDispatcher firewall manager.Manager routeManager routemanager.Manager @@ -423,7 +424,9 @@ func (e *Engine) Start() error { NATExternalIPs: e.parseNATExternalIPMappings(), } - e.connMgr = NewConnMgr(e.peerStore, wgIface) + e.peerConnDispatcher = peer.NewConnectionDispatcher() + + e.connMgr = NewConnMgr(e.peerStore, wgIface, e.peerConnDispatcher) e.connMgr.Start(e.ctx) e.srWatcher = guard.NewSRWatcher(e.signal, e.relayManager, e.mobileDep.IFaceDiscover, iceCfg) @@ -1085,7 +1088,7 @@ func (e *Engine) updateOfflinePeers(offlinePeers []*mgmProto.RemotePeerConfig) { IP: strings.Join(offlinePeer.GetAllowedIps(), ","), PubKey: offlinePeer.GetWgPubKey(), FQDN: offlinePeer.GetFqdn(), - ConnStatus: peer.StatusDisconnected, + ConnStatus: peer.StatusIdle, ConnStatusUpdate: time.Now(), Mux: new(sync.RWMutex), } @@ -1195,7 +1198,7 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix) (*peer }, } - peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher, e.connSemaphore) + peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher, e.connSemaphore, e.peerConnDispatcher) if err != nil { return nil, err } diff --git a/client/internal/lazyconn/listener/allocator.go b/client/internal/lazyconn/activity/allocator.go similarity index 98% rename from client/internal/lazyconn/listener/allocator.go rename to client/internal/lazyconn/activity/allocator.go index 807ae655f..05813f8a8 100644 --- a/client/internal/lazyconn/listener/allocator.go +++ b/client/internal/lazyconn/activity/allocator.go @@ -1,4 +1,4 @@ -package listener +package activity import ( "fmt" diff --git a/client/internal/lazyconn/listener/allocator_test.go b/client/internal/lazyconn/activity/allocator_test.go similarity index 97% rename from client/internal/lazyconn/listener/allocator_test.go rename to client/internal/lazyconn/activity/allocator_test.go index 82fee065a..4345ebb3e 100644 --- a/client/internal/lazyconn/listener/allocator_test.go +++ b/client/internal/lazyconn/activity/allocator_test.go @@ -1,4 +1,4 @@ -package listener +package activity import ( "testing" diff --git a/client/internal/lazyconn/listener/listener.go b/client/internal/lazyconn/activity/listener.go similarity index 98% rename from client/internal/lazyconn/listener/listener.go rename to client/internal/lazyconn/activity/listener.go index 77486c29e..abad3e5eb 100644 --- a/client/internal/lazyconn/listener/listener.go +++ b/client/internal/lazyconn/activity/listener.go @@ -1,4 +1,4 @@ -package listener +package activity import ( "net" diff --git a/client/internal/lazyconn/listener/manager.go b/client/internal/lazyconn/activity/manager.go similarity index 71% rename from client/internal/lazyconn/listener/manager.go rename to client/internal/lazyconn/activity/manager.go index 7c5870049..a3f2af5b7 100644 --- a/client/internal/lazyconn/listener/manager.go +++ b/client/internal/lazyconn/activity/manager.go @@ -1,4 +1,4 @@ -package listener +package activity import ( "fmt" @@ -10,13 +10,13 @@ import ( "github.com/netbirdio/netbird/client/internal/peer" ) -type OnDemandEvent struct { +type OnAcitvityEvent struct { PeerID string PeerConnId peer.ConnID } type Manager struct { - TrafficStartChan chan OnDemandEvent + OnActivityChan chan OnAcitvityEvent wgIface lazyconn.WGIface @@ -29,20 +29,21 @@ type Manager struct { func NewManager(wgIface lazyconn.WGIface) *Manager { m := &Manager{ - TrafficStartChan: make(chan OnDemandEvent, 1), - wgIface: wgIface, - portGenerator: newPortAllocator(), - peers: make(map[string]*Listener), - done: make(chan struct{}), + OnActivityChan: make(chan OnAcitvityEvent, 1), + wgIface: wgIface, + portGenerator: newPortAllocator(), + peers: make(map[string]*Listener), + done: make(chan struct{}), } return m } -func (m *Manager) CreatePeerListener(peerCfg lazyconn.PeerConfig) error { +func (m *Manager) MonitorPeerActivity(peerCfg lazyconn.PeerConfig) error { m.mu.Lock() defer m.mu.Unlock() if _, ok := m.peers[peerCfg.PublicKey]; ok { + log.Warnf("on-demand listener already exists for: %s", peerCfg.PublicKey) return nil } @@ -64,16 +65,17 @@ func (m *Manager) CreatePeerListener(peerCfg lazyconn.PeerConfig) error { return nil } -func (m *Manager) RemovePeer(peerID string) { +func (m *Manager) RemovePeer(peerID string) bool { m.mu.Lock() + defer m.mu.Unlock() + listener, ok := m.peers[peerID] if !ok { - m.mu.Unlock() - return + return false } delete(m.peers, peerID) listener.Close() - m.mu.Unlock() + return true } func (m *Manager) Close() { @@ -85,7 +87,6 @@ func (m *Manager) Close() { listener.Close() delete(m.peers, peerID) } - // todo drain TrafficStartChan } func (m *Manager) waitForTraffic(listener *Listener, peerID string, peerConnID peer.ConnID) { @@ -99,13 +100,13 @@ func (m *Manager) waitForTraffic(listener *Listener, peerID string, peerConnID p delete(m.peers, peerID) m.mu.Unlock() - m.notify(OnDemandEvent{PeerID: peerID, PeerConnId: peerConnID}) + m.notify(OnAcitvityEvent{PeerID: peerID, PeerConnId: peerConnID}) } -func (m *Manager) notify(event OnDemandEvent) { +func (m *Manager) notify(event OnAcitvityEvent) { log.Debugf("peer started to send traffic: %s", event.PeerID) select { case <-m.done: - case m.TrafficStartChan <- event: + case m.OnActivityChan <- event: } } diff --git a/client/internal/lazyconn/inactivity/inactivity.go b/client/internal/lazyconn/inactivity/inactivity.go new file mode 100644 index 000000000..1499e1dd7 --- /dev/null +++ b/client/internal/lazyconn/inactivity/inactivity.go @@ -0,0 +1,62 @@ +package inactivity + +import ( + "context" + "time" + + log "github.com/sirupsen/logrus" +) + +const ( + inactivityThreshold = 30 * time.Second // idle after 1 hour inactivity +) + +type InactivityMonitor struct { + peerID string + timer *time.Timer + cancel context.CancelFunc +} + +func NewInactivityMonitor(peerID string) *InactivityMonitor { + i := &InactivityMonitor{ + peerID: peerID, + timer: time.NewTimer(0), + } + i.timer.Stop() + return i +} + +func (i *InactivityMonitor) Start(ctx context.Context, timeoutChan chan string) { + i.timer.Reset(inactivityThreshold) + defer i.timer.Stop() + + ctx, i.cancel = context.WithCancel(ctx) + defer i.cancel() + + select { + case <-i.timer.C: + select { + case timeoutChan <- i.peerID: + log.Infof("--- idle timeout for peer: %s", i.peerID) + case <-ctx.Done(): + return + } + case <-ctx.Done(): + return + } +} + +func (i *InactivityMonitor) Stop() { + log.Info("--- cancel idle timer") + i.cancel() +} + +func (i *InactivityMonitor) PauseTimer() { + log.Info("--- hangup idle timer") + i.timer.Stop() +} + +func (i *InactivityMonitor) ResetTimer() { + log.Info("--- resetting idle timer") + i.timer.Reset(inactivityThreshold) +} diff --git a/client/internal/lazyconn/manager/idle.go b/client/internal/lazyconn/manager/idle.go deleted file mode 100644 index e124acd49..000000000 --- a/client/internal/lazyconn/manager/idle.go +++ /dev/null @@ -1,54 +0,0 @@ -package manager - -import ( - "context" - "time" -) - -const ( - idleTimeout = 60 * time.Minute // went to idle after 1 hour inactivity -) - -type IdleWatch struct { - onIdle chan struct{} - timer *time.Timer -} - -func NewIdleWatch() *IdleWatch { - i := &IdleWatch{ - onIdle: make(chan struct{}, 1), - timer: time.NewTimer(0), - } - i.timer.Stop() - return i -} - -// call on open connection -func (i *IdleWatch) Start(ctx context.Context) { - i.timer.Reset(idleTimeout) - defer i.timer.Stop() - - select { - case <-i.timer.C: - select { - case i.Idle <- struct{}{}: - default: - } - case <-ctx.Done(): - return - } -} - -func (i *IdleWatch) Stop() { - // todo implement -} - -// call when connected -func (i *IdleWatch) HangUp() { - i.timer.Stop() -} - -// call when switch to None priority -func (i *IdleWatch) Reset() { - i.timer.Reset(idleTimeout) -} diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go index c645cce68..a6d38671d 100644 --- a/client/internal/lazyconn/manager/manager.go +++ b/client/internal/lazyconn/manager/manager.go @@ -2,13 +2,13 @@ package manager import ( "context" - "fmt" "sync" log "github.com/sirupsen/logrus" "github.com/netbirdio/netbird/client/internal/lazyconn" - "github.com/netbirdio/netbird/client/internal/lazyconn/listener" + "github.com/netbirdio/netbird/client/internal/lazyconn/activity" + "github.com/netbirdio/netbird/client/internal/lazyconn/inactivity" "github.com/netbirdio/netbird/client/internal/peer" ) @@ -18,13 +18,16 @@ type Manager struct { OnDemand chan string Idle chan string - listenerMgr *listener.Manager - managedPeers map[string]lazyconn.PeerConfig - idleWatch map[string]*IdleWatch + connStateDispatcher *peer.ConnectionDispatcher + managedPeers map[string]lazyconn.PeerConfig + activityManager *activity.Manager + inactivityMonitors map[string]*inactivity.InactivityMonitor - excludes map[string]struct{} - managedPeersMu sync.Mutex - closeChan chan struct{} + excludes map[string]struct{} + managedPeersMu sync.Mutex + cancel context.CancelFunc + connStateListener *peer.ConnectionListener + onIdle chan string } func NewManager(wgIface lazyconn.WGIface, connStateDispatcher *peer.ConnectionDispatcher) *Manager { @@ -32,20 +35,20 @@ func NewManager(wgIface lazyconn.WGIface, connStateDispatcher *peer.ConnectionDi OnDemand: make(chan string, 1), Idle: make(chan string, 1), - listenerMgr: listener.NewManager(wgIface), - managedPeers: make(map[string]lazyconn.PeerConfig), - idleWatch: make(map[string]*IdleWatch), - excludes: make(map[string]struct{}), - closeChan: make(chan struct{}), + connStateDispatcher: connStateDispatcher, + managedPeers: make(map[string]lazyconn.PeerConfig), + activityManager: activity.NewManager(wgIface), + inactivityMonitors: make(map[string]*inactivity.InactivityMonitor), + excludes: make(map[string]struct{}), + onIdle: make(chan string), } - connStateListener := &peer.ConnectionListener{ + m.connStateListener = &peer.ConnectionListener{ OnConnected: m.onPeerConnected, OnDisconnected: m.onPeerDisconnected, } - connStateDispatcher.AddListener(connStateListener) - e.connStateListener = connStateListener + connStateDispatcher.AddListener(m.connStateListener) return m } @@ -54,7 +57,7 @@ func (m *Manager) AddPeer(peer lazyconn.PeerConfig) (bool, error) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() - log.Debugf("adding lazy peer: %s", peer.PublicKey) + peer.Log.Debugf("adding peer to lazy connection manager") _, exists := m.excludes[peer.PublicKey] if exists { @@ -62,38 +65,82 @@ func (m *Manager) AddPeer(peer lazyconn.PeerConfig) (bool, error) { } if _, ok := m.managedPeers[peer.PublicKey]; ok { - log.Warnf("peer already managed: %s", peer.PublicKey) + peer.Log.Warnf("peer already managed") return false, nil } - if err := m.listenerMgr.CreatePeerListener(peer); err != nil { + if err := m.activityManager.MonitorPeerActivity(peer); err != nil { return false, err } + iw := inactivity.NewInactivityMonitor(peer.PublicKey) + m.inactivityMonitors[peer.PublicKey] = iw + m.managedPeers[peer.PublicKey] = peer return false, nil } -func (m *Manager) RemovePeer(peerID string) bool { +func (m *Manager) RemovePeer(peerID string) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() - if _, ok := m.managedPeers[peerID]; !ok { + cfg, ok := m.managedPeers[peerID] + if !ok { + return + } + + cfg.Log.Infof("removing lazy peer") + + if idleWatch, ok := m.inactivityMonitors[peerID]; ok { + idleWatch.Stop() + delete(m.inactivityMonitors, peerID) + cfg.Log.Debugf("idle watch stopped") + } + + m.activityManager.RemovePeer(peerID) + delete(m.managedPeers, peerID) + cfg.Log.Debugf("on-demand listener removed") +} + +func (m *Manager) RunIdleWatch(peerID string) (found bool) { + m.managedPeersMu.Lock() + defer m.managedPeersMu.Unlock() + + cfg, ok := m.managedPeers[peerID] + if !ok { return false } - log.Debugf("removing lazy peer: %s", peerID) - - if idleWatch, ok := m.idleWatch[peerID]; ok { - idleWatch.Stop() - delete(m.idleWatch, peerID) + if removed := m.activityManager.RemovePeer(peerID); !removed { + return false } - m.listenerMgr.RemovePeer(peerID) - delete(m.managedPeers, peerID) + m.inactivityMonitors[peerID].PauseTimer() + + cfg.Log.Infof("stoped on-demand listener and idle watcher") return true } +func (m *Manager) RunOnDemandListener(peerID string) { + m.managedPeersMu.Lock() + defer m.managedPeersMu.Unlock() + + cfg, ok := m.managedPeers[peerID] + if !ok { + return + } + + cfg.Log.Infof("run on-demand listener") + + // just in case free up + m.inactivityMonitors[peerID].PauseTimer() + + if err := m.activityManager.MonitorPeerActivity(cfg); err != nil { + cfg.Log.Errorf("failed to create on-demand listener: %v", err) + return + } +} + func (m *Manager) ExcludePeer(peerID string) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -107,77 +154,87 @@ func (m *Manager) Close() { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() - // todo prevent double call - close(m.closeChan) - - m.listenerMgr.Close() + m.cancel() + m.connStateDispatcher.RemoveListener(m.connStateListener) + m.activityManager.Close() + for _, iw := range m.inactivityMonitors { + iw.Stop() + } + m.inactivityMonitors = make(map[string]*inactivity.InactivityMonitor) m.managedPeers = make(map[string]lazyconn.PeerConfig) + log.Infof("lazy connection manager closed") } -func (m *Manager) Start(ctx context.Context) (string, error) { +func (m *Manager) Start(ctx context.Context) { + ctx, m.cancel = context.WithCancel(ctx) for { select { - case <-m.closeChan: - return "", fmt.Errorf("service closed") case <-ctx.Done(): - return "", ctx.Err() - case e := <-m.listenerMgr.TrafficStartChan: - m.managedPeersMu.Lock() - pcfg, ok := m.managedPeers[e.PeerID] - if !ok { - m.managedPeersMu.Unlock() - continue - } - - if pcfg.PeerConnID != e.PeerConnId { - m.managedPeersMu.Unlock() - continue - } - - idleWatch := NewIdleWatch() - idleWatch.Start(ctx) - m.idleWatch[e.PeerID] = idleWatch - - m.managedPeersMu.Unlock() - return e.PeerID, nil + return + case e := <-m.activityManager.OnActivityChan: + m.onPeerDemand(ctx, e) + case peerID := <-m.onIdle: + m.onPeerIdleTimeout(ctx, peerID) } } } -/* -func (m *Manager) NextOpenEvent(ctx context.Context) (string, error) { - for { - select { - case <-m.closeChan: - return "", fmt.Errorf("service closed") - case <-ctx.Done(): - return "", ctx.Err() - case e := <-m.listenerMgr.TrafficStartChan: - m.managedPeersMu.Lock() - pcfg, ok := m.managedPeers[e.PeerID] - if !ok { - m.managedPeersMu.Unlock() - continue - } +func (m *Manager) onPeerDemand(ctx context.Context, e activity.OnAcitvityEvent) { + m.managedPeersMu.Lock() + defer m.managedPeersMu.Unlock() - if pcfg.PeerConnID != e.PeerConnId { - m.managedPeersMu.Unlock() - continue - } + pcfg, ok := m.managedPeers[e.PeerID] + if !ok { + return + } - idleWatch := NewIdleWatch() - idleWatch.Start(ctx) - m.idleWatch[e.PeerID] = idleWatch + pcfg.Log.Infof("detected traffic initiative") - m.managedPeersMu.Unlock() - return e.PeerID, nil - } + if pcfg.PeerConnID != e.PeerConnId { + pcfg.Log.Debugf("peer conn instance id mismatch, doing nothing") + return + } + + pcfg.Log.Infof("starting idle watcher") + go m.inactivityMonitors[e.PeerID].Start(ctx, m.Idle) + + select { + case m.OnDemand <- e.PeerID: + case <-ctx.Done(): + return } } -*/ +func (m *Manager) onPeerIdleTimeout(ctx context.Context, peerID string) { + m.managedPeersMu.Lock() + defer m.managedPeersMu.Unlock() + pcfg, ok := m.managedPeers[peerID] + if !ok { + return + } + + pcfg.Log.Infof("connection timed out") + + /* + if pcfg.PeerConnID != e.PeerConnId { + pcfg.Log.Debugf("peer conn instance id mismatch, doing nothing") + return + } + + */ + + if _, ok := m.inactivityMonitors[peerID]; !ok { + return + } + + select { + case m.Idle <- peerID: + case <-ctx.Done(): + return + } +} func (m *Manager) onPeerConnected(conn *peer.Conn) { m.managedPeersMu.Lock() defer m.managedPeersMu.Unlock() @@ -186,12 +243,13 @@ func (m *Manager) onPeerConnected(conn *peer.Conn) { return } - iw, ok := m.idleWatch[conn.GetKey()] + iw, ok := m.inactivityMonitors[conn.GetKey()] if !ok { conn.Log.Errorf("idle watch not found for peer") + return } - iw.HangUp() + iw.PauseTimer() } func (m *Manager) onPeerDisconnected(conn *peer.Conn) { @@ -202,10 +260,10 @@ func (m *Manager) onPeerDisconnected(conn *peer.Conn) { return } - iw, ok := m.idleWatch[conn.GetKey()] + iw, ok := m.inactivityMonitors[conn.GetKey()] if !ok { - conn.Log.Errorf("idle watch not found for peer") + return } - iw.Reset() + iw.ResetTimer() } diff --git a/client/internal/lazyconn/peercfg.go b/client/internal/lazyconn/peercfg.go index 56540c956..5c5a28580 100644 --- a/client/internal/lazyconn/peercfg.go +++ b/client/internal/lazyconn/peercfg.go @@ -3,6 +3,8 @@ package lazyconn import ( "net/netip" + log "github.com/sirupsen/logrus" + "github.com/netbirdio/netbird/client/internal/peer" ) @@ -10,4 +12,5 @@ type PeerConfig struct { PublicKey string AllowedIPs []netip.Prefix PeerConnID peer.ConnID + Log *log.Entry } diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index eaee10930..9a2f9a70a 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -115,13 +115,14 @@ type Conn struct { wgProxyICE wgproxy.Proxy wgProxyRelay wgproxy.Proxy - guard *guard.Guard - semaphore *semaphoregroup.SemaphoreGroup + guard *guard.Guard + semaphore *semaphoregroup.SemaphoreGroup + peerConnDispatcher *ConnectionDispatcher } // NewConn creates a new not opened Conn to the remote peer. // To establish a connection run Conn.Open -func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher, semaphore *semaphoregroup.SemaphoreGroup) (*Conn, error) { +func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher, semaphore *semaphoregroup.SemaphoreGroup, peerConnDispatcher *ConnectionDispatcher) (*Conn, error) { if len(config.WgConfig.AllowedIps) == 0 { return nil, fmt.Errorf("allowed IPs is empty") } @@ -130,16 +131,17 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu connLog := log.WithField("peer", config.Key) var conn = &Conn{ - Log: connLog, - ctx: ctx, - ctxCancel: ctxCancel, - config: config, - statusRecorder: statusRecorder, - signaler: signaler, - relayManager: relayManager, - statusRelay: NewAtomicConnStatus(), - statusICE: NewAtomicConnStatus(), - semaphore: semaphore, + Log: connLog, + ctx: ctx, + ctxCancel: ctxCancel, + config: config, + statusRecorder: statusRecorder, + signaler: signaler, + relayManager: relayManager, + statusRelay: NewAtomicConnStatus(), + statusICE: NewAtomicConnStatus(), + semaphore: semaphore, + peerConnDispatcher: peerConnDispatcher, } ctrl := isController(config) @@ -167,12 +169,16 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu // Open opens connection to the remote peer // It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will // be used. -// todo: prevent double open func (conn *Conn) Open() { conn.semaphore.Add(conn.ctx) conn.mu.Lock() defer conn.mu.Unlock() + + if conn.opened { + conn.semaphore.Done(conn.ctx) + return + } conn.opened = true go conn.handshaker.Listen() @@ -180,7 +186,7 @@ func (conn *Conn) Open() { peerState := State{ PubKey: conn.config.Key, ConnStatusUpdate: time.Now(), - ConnStatus: StatusDisconnected, + ConnStatus: StatusConnecting, Mux: new(sync.RWMutex), } err := conn.statusRecorder.UpdatePeerState(peerState) @@ -248,6 +254,7 @@ func (conn *Conn) Close() { } conn.setStatusToDisconnected() + conn.opened = false } // OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise @@ -371,10 +378,16 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC return } wgConfigWorkaround() + + oldState := conn.currentConnPriority conn.currentConnPriority = priority conn.statusICE.Set(StatusConnected) conn.updateIceState(iceConnInfo) conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr) + + if oldState == connPriorityNone { + conn.peerConnDispatcher.NotifyConnected(conn) + } } func (conn *Conn) onICEStateDisconnected() { @@ -406,13 +419,14 @@ func (conn *Conn) onICEStateDisconnected() { } else { conn.Log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", connPriorityNone.String()) conn.currentConnPriority = connPriorityNone + conn.peerConnDispatcher.NotifyDisconnected(conn) } - changed := conn.statusICE.Get() != StatusDisconnected + changed := conn.statusICE.Get() != StatusIdle if changed { conn.guard.SetICEConnDisconnected() } - conn.statusICE.Set(StatusDisconnected) + conn.statusICE.Set(StatusIdle) peerState := State{ PubKey: conn.config.Key, @@ -477,6 +491,7 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) { conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey) conn.Log.Infof("start to communicate with peer via relay") conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr) + conn.peerConnDispatcher.NotifyConnected(conn) } func (conn *Conn) onRelayDisconnected() { @@ -494,6 +509,8 @@ func (conn *Conn) onRelayDisconnected() { if err := conn.removeWgPeer(); err != nil { conn.Log.Errorf("failed to remove wg endpoint: %v", err) } + conn.currentConnPriority = connPriorityNone + conn.peerConnDispatcher.NotifyDisconnected(conn) } if conn.wgProxyRelay != nil { @@ -501,11 +518,11 @@ func (conn *Conn) onRelayDisconnected() { conn.wgProxyRelay = nil } - changed := conn.statusRelay.Get() != StatusDisconnected + changed := conn.statusRelay.Get() != StatusIdle if changed { conn.guard.SetRelayedConnDisconnected() } - conn.statusRelay.Set(StatusDisconnected) + conn.statusRelay.Set(StatusIdle) peerState := State{ PubKey: conn.config.Key, @@ -578,12 +595,12 @@ func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo) { } func (conn *Conn) setStatusToDisconnected() { - conn.statusRelay.Set(StatusDisconnected) - conn.statusICE.Set(StatusDisconnected) + conn.statusRelay.Set(StatusIdle) + conn.statusICE.Set(StatusIdle) peerState := State{ PubKey: conn.config.Key, - ConnStatus: StatusDisconnected, + ConnStatus: StatusIdle, ConnStatusUpdate: time.Now(), Mux: new(sync.RWMutex), } @@ -622,7 +639,7 @@ func (conn *Conn) waitInitialRandomSleepTime(ctx context.Context) { } func (conn *Conn) isRelayed() bool { - if conn.statusRelay.Get() == StatusDisconnected && (conn.statusICE.Get() == StatusDisconnected || conn.statusICE.Get() == StatusConnecting) { + if conn.statusRelay.Get() == StatusIdle && (conn.statusICE.Get() == StatusIdle || conn.statusICE.Get() == StatusConnecting) { return false } @@ -642,7 +659,7 @@ func (conn *Conn) evalStatus() ConnStatus { return StatusConnecting } - return StatusDisconnected + return StatusIdle } func (conn *Conn) isConnectedOnAllWay() (connected bool) { @@ -655,7 +672,7 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) { } }() - if conn.statusICE.Get() == StatusDisconnected { + if conn.statusICE.Get() == StatusIdle { return false } diff --git a/client/internal/peer/conn_status.go b/client/internal/peer/conn_status.go index 3c747864f..3a22bdc65 100644 --- a/client/internal/peer/conn_status.go +++ b/client/internal/peer/conn_status.go @@ -7,12 +7,12 @@ import ( ) const ( - // StatusConnected indicate the peer is in connected state - StatusConnected ConnStatus = iota + // StatusIdle indicate the peer is in disconnected state + StatusIdle ConnStatus = iota // StatusConnecting indicate the peer is in connecting state StatusConnecting - // StatusDisconnected indicate the peer is in disconnected state - StatusDisconnected + // StatusConnected indicate the peer is in connected state + StatusConnected ) // ConnStatus describe the status of a peer's connection @@ -26,7 +26,7 @@ type AtomicConnStatus struct { // NewAtomicConnStatus creates a new AtomicConnStatus with the given initial status func NewAtomicConnStatus() *AtomicConnStatus { acs := &AtomicConnStatus{} - acs.Set(StatusDisconnected) + acs.Set(StatusIdle) return acs } @@ -51,8 +51,8 @@ func (s ConnStatus) String() string { return "Connecting" case StatusConnected: return "Connected" - case StatusDisconnected: - return "Disconnected" + case StatusIdle: + return "Idle" default: log.Errorf("unknown status: %d", s) return "INVALID_PEER_CONNECTION_STATUS" diff --git a/client/internal/peer/conn_status_test.go b/client/internal/peer/conn_status_test.go index 6088df55d..33665eef1 100644 --- a/client/internal/peer/conn_status_test.go +++ b/client/internal/peer/conn_status_test.go @@ -14,7 +14,7 @@ func TestConnStatus_String(t *testing.T) { want string }{ {"StatusConnected", StatusConnected, "Connected"}, - {"StatusDisconnected", StatusDisconnected, "Disconnected"}, + {"StatusIdle", StatusIdle, "Disconnected"}, {"StatusConnecting", StatusConnecting, "Connecting"}, } diff --git a/client/internal/peer/conn_test.go b/client/internal/peer/conn_test.go index 505bedb7f..5e4c5a521 100644 --- a/client/internal/peer/conn_test.go +++ b/client/internal/peer/conn_test.go @@ -138,11 +138,11 @@ func TestConn_Status(t *testing.T) { want ConnStatus }{ {"StatusConnected", StatusConnected, StatusConnected, StatusConnected}, - {"StatusDisconnected", StatusDisconnected, StatusDisconnected, StatusDisconnected}, + {"StatusIdle", StatusIdle, StatusIdle, StatusIdle}, {"StatusConnecting", StatusConnecting, StatusConnecting, StatusConnecting}, - {"StatusConnectingIce", StatusConnecting, StatusDisconnected, StatusConnecting}, + {"StatusConnectingIce", StatusConnecting, StatusIdle, StatusConnecting}, {"StatusConnectingIceAlternative", StatusConnecting, StatusConnected, StatusConnected}, - {"StatusConnectingRelay", StatusDisconnected, StatusConnecting, StatusConnecting}, + {"StatusConnectingRelay", StatusIdle, StatusConnecting, StatusConnecting}, {"StatusConnectingRelayAlternative", StatusConnected, StatusConnecting, StatusConnected}, } diff --git a/client/internal/peer/dispatcher.go b/client/internal/peer/dispatcher.go index 077d336d4..0e522f646 100644 --- a/client/internal/peer/dispatcher.go +++ b/client/internal/peer/dispatcher.go @@ -4,15 +4,6 @@ import ( "sync" ) -/* - handler := peer.ConnectionListener{ - OnConnected: m.onPeerConnected, - OnDisconnected: m.onPeerDisconnected, - } - - dispatcher.AddListener(handler) -*/ - type ConnectionListener struct { OnConnected func(peer *Conn) OnDisconnected func(peer *Conn) diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index b1a208e57..67f1cc07a 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -216,7 +216,7 @@ func (d *Status) AddPeer(peerPubKey string, fqdn string, ip string) error { d.peers[peerPubKey] = State{ PubKey: peerPubKey, IP: ip, - ConnStatus: StatusDisconnected, + ConnStatus: StatusIdle, FQDN: fqdn, Mux: new(sync.RWMutex), } @@ -466,9 +466,9 @@ func shouldSkipNotify(receivedConnStatus ConnStatus, curr State) bool { switch { case receivedConnStatus == StatusConnecting: return true - case receivedConnStatus == StatusDisconnected && curr.ConnStatus == StatusConnecting: + case receivedConnStatus == StatusIdle && curr.ConnStatus == StatusConnecting: return true - case receivedConnStatus == StatusDisconnected && curr.ConnStatus == StatusDisconnected: + case receivedConnStatus == StatusIdle && curr.ConnStatus == StatusIdle: return curr.IP != "" default: return false