Add signal message

This commit is contained in:
Zoltán Papp
2025-06-19 16:22:59 +02:00
parent d7e68ff812
commit 2d401a7dce
8 changed files with 117 additions and 51 deletions

View File

@@ -143,7 +143,7 @@ func (m *Manager) Start(ctx context.Context) {
case <-ctx.Done():
return
case peerConnID := <-m.activityManager.OnActivityChan:
m.onPeerActivity(ctx, peerConnID)
m.onPeerActivity(peerConnID)
case peerIDs := <-m.inactivityManager.InactivePeersChan:
for _, peerID := range peerIDs {
m.onPeerInactivityTimedOut(peerID)
@@ -265,15 +265,42 @@ func (m *Manager) ActivatePeer(ctx context.Context, peerID string) (found bool)
return false
}
if !m.activateSinglePeer(ctx, cfg, mp) {
if !m.activateSinglePeer(cfg, mp) {
return false
}
m.activateHAGroupPeers(ctx, peerID)
m.activateHAGroupPeers(peerID)
return true
}
func (m *Manager) DeactivatePeer(peerID peerid.ConnID) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
mp, ok := m.managedPeersByConnID[peerID]
if !ok {
return
}
if mp.expectedWatcher != watcherInactivity {
return
}
m.peerStore.PeerConnClose(mp.peerCfg.PublicKey)
mp.peerCfg.Log.Infof("start activity monitor")
mp.expectedWatcher = watcherActivity
m.inactivityManager.RemovePeer(mp.peerCfg.PublicKey)
if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil {
mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err)
return
}
}
// getPeerForActivation checks if a peer can be activated and returns the necessary structs
// Returns nil values if the peer should be skipped
func (m *Manager) getPeerForActivation(peerID string) (*lazyconn.PeerConfig, *managedPeer) {
@@ -296,7 +323,7 @@ func (m *Manager) getPeerForActivation(peerID string) (*lazyconn.PeerConfig, *ma
}
// activateSinglePeer activates a single peer (internal method)
func (m *Manager) activateSinglePeer(ctx context.Context, cfg *lazyconn.PeerConfig, mp *managedPeer) bool {
func (m *Manager) activateSinglePeer(cfg *lazyconn.PeerConfig, mp *managedPeer) bool {
mp.expectedWatcher = watcherInactivity
m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID)
@@ -308,7 +335,7 @@ func (m *Manager) activateSinglePeer(ctx context.Context, cfg *lazyconn.PeerConf
}
// activateHAGroupPeers activates all peers in HA groups that the given peer belongs to
func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string) {
func (m *Manager) activateHAGroupPeers(triggerPeerID string) {
m.routesMu.RLock()
haGroups := m.peerToHAGroups[triggerPeerID]
m.routesMu.RUnlock()
@@ -334,7 +361,7 @@ func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string
continue
}
if m.activateSinglePeer(ctx, cfg, mp) {
if m.activateSinglePeer(cfg, mp) {
activatedCount++
cfg.Log.Infof("activated peer as part of HA group %s (triggered by %s)", haGroup, triggerPeerID)
m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey)
@@ -398,7 +425,7 @@ func (m *Manager) close() {
log.Infof("lazy connection manager closed")
}
func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) {
func (m *Manager) onPeerActivity(peerConnID peerid.ConnID) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
@@ -415,11 +442,11 @@ func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID)
mp.peerCfg.Log.Infof("detected peer activity")
if !m.activateSinglePeer(ctx, mp.peerCfg, mp) {
if !m.activateSinglePeer(mp.peerCfg, mp) {
return
}
m.activateHAGroupPeers(ctx, mp.peerCfg.PublicKey)
m.activateHAGroupPeers(mp.peerCfg.PublicKey)
m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey)
}