This commit is contained in:
Zoltán Papp
2025-03-14 14:20:55 +01:00
parent e7098d7b77
commit 7e422b1d76
3 changed files with 65 additions and 76 deletions

View File

@@ -29,6 +29,7 @@ type ConnMgr struct {
connStateListener *peer.ConnectionListener connStateListener *peer.ConnectionListener
mu sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
ctx context.Context ctx context.Context
ctxCancel context.CancelFunc ctxCancel context.CancelFunc
@@ -57,13 +58,10 @@ func (e *ConnMgr) Start(parentCtx context.Context) {
e.ctx = ctx e.ctx = ctx
e.ctxCancel = cancel e.ctxCancel = cancel
e.wg.Add(1)
go e.receiveLazyEvents(ctx)
e.wg.Add(1) e.wg.Add(1)
go func() { go func() {
defer e.wg.Done() defer e.wg.Done()
e.lazyConnMgr.Start(ctx) e.lazyConnMgr.Start(ctx, e.onActive, e.onInactive)
}() }()
} }
@@ -72,12 +70,17 @@ func (e *ConnMgr) AddExcludeFromLazyConnection(peerID string) {
} }
func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) { func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) {
e.mu.Lock()
defer e.mu.Unlock()
if success := e.peerStore.AddPeerConn(peerKey, conn); !success { if success := e.peerStore.AddPeerConn(peerKey, conn); !success {
return true return true
} }
if !e.isStartedWithLazyMgr() { if !e.isStartedWithLazyMgr() {
conn.Open(e.ctx) if err := conn.Open(e.ctx); err != nil {
conn.Log.Errorf("failed to open connection: %v", err)
}
return return
} }
@@ -90,13 +93,17 @@ func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) {
excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg) excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg)
if err != nil { if err != nil {
conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err) conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err)
conn.Open(e.ctx) if err := conn.Open(e.ctx); err != nil {
conn.Log.Errorf("failed to open connection: %v", err)
}
return return
} }
if excluded { if excluded {
conn.Log.Infof("peer is on lazy conn manager exclude list, opening connection") conn.Log.Infof("peer is on lazy conn manager exclude list, opening connection")
conn.Open(e.ctx) if err := conn.Open(e.ctx); err != nil {
conn.Log.Errorf("failed to open connection: %v", err)
}
return return
} }
@@ -105,6 +112,9 @@ func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) {
} }
func (e *ConnMgr) OnSignalMsg(peerKey string) (*peer.Conn, bool) { func (e *ConnMgr) OnSignalMsg(peerKey string) (*peer.Conn, bool) {
e.mu.Lock()
defer e.mu.Unlock()
conn, ok := e.peerStore.PeerConn(peerKey) conn, ok := e.peerStore.PeerConn(peerKey)
if !ok { if !ok {
return nil, false return nil, false
@@ -115,12 +125,17 @@ func (e *ConnMgr) OnSignalMsg(peerKey string) (*peer.Conn, bool) {
} }
if found := e.lazyConnMgr.RunInactivityMonitor(peerKey); found { if found := e.lazyConnMgr.RunInactivityMonitor(peerKey); found {
conn.Open(e.ctx) if err := conn.Open(e.ctx); err != nil {
conn.Log.Errorf("failed to open connection: %v", err)
}
} }
return conn, true return conn, true
} }
func (e *ConnMgr) RemovePeerConn(peerKey string) { func (e *ConnMgr) RemovePeerConn(peerKey string) {
e.mu.Lock()
defer e.mu.Unlock()
conn, ok := e.peerStore.Remove(peerKey) conn, ok := e.peerStore.Remove(peerKey)
if !ok { if !ok {
return return
@@ -141,25 +156,22 @@ func (e *ConnMgr) Close() {
} }
e.ctxCancel() e.ctxCancel()
e.lazyConnMgr.Close()
e.wg.Wait() e.wg.Wait()
e.lazyConnMgr = nil e.lazyConnMgr = nil
} }
func (e *ConnMgr) receiveLazyEvents(ctx context.Context) { func (e *ConnMgr) onActive(peerID string) {
defer e.wg.Done() e.mu.Lock()
for { defer e.mu.Unlock()
select {
case <-ctx.Done(): e.peerStore.PeerConnOpen(e.ctx, peerID)
return }
case peerID := <-e.lazyConnMgr.OnActive:
e.peerStore.PeerConnOpen(e.ctx, peerID) func (e *ConnMgr) onInactive(peerID string) {
case peerID := <-e.lazyConnMgr.Idle: e.mu.Lock()
// todo consider to use engine lock defer e.mu.Unlock()
e.peerStore.PeerConnClose(peerID)
e.lazyConnMgr.RunActivityMonitor(peerID) e.peerStore.PeerConnClose(peerID)
}
}
} }
func (e *ConnMgr) isStartedWithLazyMgr() bool { func (e *ConnMgr) isStartedWithLazyMgr() bool {

View File

@@ -15,9 +15,6 @@ import (
// Manager manages lazy connections // Manager manages lazy connections
// This is not a thread safe implementation, do not call exported functions concurrently // This is not a thread safe implementation, do not call exported functions concurrently
type Manager struct { type Manager struct {
OnActive chan string
Idle chan string
connStateDispatcher *peer.ConnectionDispatcher connStateDispatcher *peer.ConnectionDispatcher
connStateListener *peer.ConnectionListener connStateListener *peer.ConnectionListener
@@ -35,9 +32,6 @@ type Manager struct {
func NewManager(wgIface lazyconn.WGIface, connStateDispatcher *peer.ConnectionDispatcher) *Manager { func NewManager(wgIface lazyconn.WGIface, connStateDispatcher *peer.ConnectionDispatcher) *Manager {
m := &Manager{ m := &Manager{
OnActive: make(chan string, 1),
Idle: make(chan string, 1),
connStateDispatcher: connStateDispatcher, connStateDispatcher: connStateDispatcher,
managedPeers: make(map[string]*lazyconn.PeerConfig), managedPeers: make(map[string]*lazyconn.PeerConfig),
managedPeersByConnID: make(map[peer.ConnID]*lazyconn.PeerConfig), managedPeersByConnID: make(map[peer.ConnID]*lazyconn.PeerConfig),
@@ -57,6 +51,22 @@ func NewManager(wgIface lazyconn.WGIface, connStateDispatcher *peer.ConnectionDi
return m return m
} }
func (m *Manager) Start(ctx context.Context, activeFn func(peerID string), inactiveFn func(peerID string)) {
defer m.close()
ctx, m.cancel = context.WithCancel(ctx)
for {
select {
case <-ctx.Done():
return
case e := <-m.activityManager.OnActivityChan:
m.onPeerActivity(ctx, e, activeFn)
case peerConnID := <-m.onInactive:
m.onPeerInactivityTimedOut(peerConnID, inactiveFn)
}
}
}
func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) { func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
@@ -127,26 +137,6 @@ func (m *Manager) RunInactivityMonitor(peerID string) (found bool) {
return true return true
} }
func (m *Manager) RunActivityMonitor(peerID string) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
cfg, ok := m.managedPeers[peerID]
if !ok {
return
}
cfg.Log.Infof("start activity monitor")
// just in case free up
m.inactivityMonitors[cfg.PeerConnID].PauseTimer()
if err := m.activityManager.MonitorPeerActivity(*cfg); err != nil {
cfg.Log.Errorf("failed to create activity monitor: %v", err)
return
}
}
func (m *Manager) ExcludePeer(peerID string) { func (m *Manager) ExcludePeer(peerID string) {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
@@ -154,9 +144,7 @@ func (m *Manager) ExcludePeer(peerID string) {
m.excludes[peerID] = struct{}{} m.excludes[peerID] = struct{}{}
} }
// Close the manager and all the listeners func (m *Manager) close() {
// block until all routine are done and cleanup the exported Channels
func (m *Manager) Close() {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
@@ -172,21 +160,7 @@ func (m *Manager) Close() {
log.Infof("lazy connection manager closed") log.Infof("lazy connection manager closed")
} }
func (m *Manager) Start(ctx context.Context) { func (m *Manager) onPeerActivity(ctx context.Context, e activity.OnAcitvityEvent, onActiveListenerFn func(peerID string)) {
ctx, m.cancel = context.WithCancel(ctx)
for {
select {
case <-ctx.Done():
return
case e := <-m.activityManager.OnActivityChan:
m.onPeerActivity(ctx, e)
case peerConnID := <-m.onInactive:
m.onPeerInactivityTimedOut(ctx, peerConnID)
}
}
}
func (m *Manager) onPeerActivity(ctx context.Context, e activity.OnAcitvityEvent) {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
@@ -200,14 +174,10 @@ func (m *Manager) onPeerActivity(ctx context.Context, e activity.OnAcitvityEvent
pcfg.Log.Infof("starting inactivity monitor") pcfg.Log.Infof("starting inactivity monitor")
go m.inactivityMonitors[e.PeerConnId].Start(ctx, m.onInactive) go m.inactivityMonitors[e.PeerConnId].Start(ctx, m.onInactive)
select { onActiveListenerFn(e.PeerID)
case m.OnActive <- e.PeerID:
case <-ctx.Done():
return
}
} }
func (m *Manager) onPeerInactivityTimedOut(ctx context.Context, peerConnID peer.ConnID) { func (m *Manager) onPeerInactivityTimedOut(peerConnID peer.ConnID, onInactiveListenerFn func(peerID string)) {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
@@ -222,12 +192,18 @@ func (m *Manager) onPeerInactivityTimedOut(ctx context.Context, peerConnID peer.
return return
} }
select { onInactiveListenerFn(pcfg.PublicKey)
case m.Idle <- pcfg.PublicKey:
case <-ctx.Done(): pcfg.Log.Infof("start activity monitor")
// just in case free up
m.inactivityMonitors[pcfg.PeerConnID].PauseTimer()
if err := m.activityManager.MonitorPeerActivity(*pcfg); err != nil {
pcfg.Log.Errorf("failed to create activity monitor: %v", err)
return return
} }
} }
func (m *Manager) onPeerConnected(conn *peer.Conn) { func (m *Manager) onPeerConnected(conn *peer.Conn) {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()

View File

@@ -267,6 +267,7 @@ func (conn *Conn) Close() {
conn.setStatusToDisconnected() conn.setStatusToDisconnected()
conn.opened = false conn.opened = false
conn.wg.Wait() conn.wg.Wait()
conn.Log.Infof("peer connection closed")
} }
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise // OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise