From a21f6ecb0a5d7ba45b4bf570a7af62ba1f66447d Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Mon, 4 May 2026 11:59:01 +0200 Subject: [PATCH] [client] release Status.mux before invoking notifier callbacks (#6039) The Status recorder used to fire notifier callbacks while holding d.mux: - notifyPeerListChanged / notifyPeerStateChangeListeners ran from inside the locked section of every Update*/AddPeerStateRoute/etc. - notifyAddressChanged ran from UpdateLocalPeerState and CleanLocalPeerState while d.mux was held. - onConnectionChanged was registered with a defer above defer d.mux.Unlock, so it executed before the mutex was released in the Mark*Connected/ Disconnected helpers. - notifyPeerStateChangeListeners did a blocking channel send under d.mux, so a slow subscriber stalled every other d.mux holder. A listener that re-enters the recorder (e.g. calls GetFullStatus from within a callback) deadlocks against d.mux, and any callback that takes longer than expected stalls every other state query for its duration. Capture the values needed for notification under the lock, release d.mux, then call the notifier. Build per-peer router-state snapshots inside the lock and dispatch them via dispatchRouterPeers afterwards. The router-peer channel send stays blocking, but now happens outside d.mux so a slow consumer cannot stall any other d.mux holder, and no peer state transitions are silently dropped. The notifier itself is unchanged: its internal state was already protected by its own locks, and the field d.notifier is set once in NewRecorder and never reassigned, so reading it without d.mux is safe. Also fix a pre-existing race in Test_notifier_RemoveListener / Test_notifier_SetListener: setListener spawns a goroutine that writes listener.peers, but the tests read listener.peers without waiting for it. --- client/internal/peer/notifier_test.go | 17 ++ client/internal/peer/status.go | 229 +++++++++++++++++--------- 2 files changed, 170 insertions(+), 76 deletions(-) diff --git a/client/internal/peer/notifier_test.go b/client/internal/peer/notifier_test.go index bbdc00e13..0b7722b0c 100644 --- a/client/internal/peer/notifier_test.go +++ b/client/internal/peer/notifier_test.go @@ -8,6 +8,7 @@ import ( type mocListener struct { lastState int wg sync.WaitGroup + peersWg sync.WaitGroup peers int } @@ -33,6 +34,7 @@ func (l *mocListener) OnAddressChanged(host, addr string) { } func (l *mocListener) OnPeersListChanged(size int) { l.peers = size + l.peersWg.Done() } func (l *mocListener) setWaiter() { @@ -43,6 +45,14 @@ func (l *mocListener) wait() { l.wg.Wait() } +func (l *mocListener) setPeersWaiter() { + l.peersWg.Add(1) +} + +func (l *mocListener) waitPeers() { + l.peersWg.Wait() +} + func Test_notifier_serverState(t *testing.T) { type scenario struct { @@ -72,11 +82,13 @@ func Test_notifier_serverState(t *testing.T) { func Test_notifier_SetListener(t *testing.T) { listener := &mocListener{} listener.setWaiter() + listener.setPeersWaiter() n := newNotifier() n.lastNotification = stateConnecting n.setListener(listener) listener.wait() + listener.waitPeers() if listener.lastState != n.lastNotification { t.Errorf("invalid state: %d, expected: %d", listener.lastState, n.lastNotification) } @@ -85,9 +97,14 @@ func Test_notifier_SetListener(t *testing.T) { func Test_notifier_RemoveListener(t *testing.T) { listener := &mocListener{} listener.setWaiter() + listener.setPeersWaiter() n := newNotifier() n.lastNotification = stateConnecting n.setListener(listener) + // setListener replays cached state on a goroutine; wait for both the state + // and peers callbacks to finish so we don't race on listener.peers. + listener.wait() + listener.waitPeers() n.removeListener() n.peerListChanged(1) diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index 7bd19b0e1..e8e61f660 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -320,10 +320,10 @@ func (d *Status) RemovePeer(peerPubKey string) error { // UpdatePeerState updates peer status func (d *Status) UpdatePeerState(receivedState State) error { d.mux.Lock() - defer d.mux.Unlock() peerState, ok := d.peers[receivedState.PubKey] if !ok { + d.mux.Unlock() return errors.New("peer doesn't exist") } @@ -343,23 +343,29 @@ func (d *Status) UpdatePeerState(receivedState State) error { d.peers[receivedState.PubKey] = peerState - if hasConnStatusChanged(oldState, receivedState.ConnStatus) { - d.notifyPeerListChanged() - } - + notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus) // when we close the connection we will not notify the router manager - if receivedState.ConnStatus == StatusIdle { - d.notifyPeerStateChangeListeners(receivedState.PubKey) + notifyRouter := receivedState.ConnStatus == StatusIdle + routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter) + numPeers := d.numOfPeers() + + d.mux.Unlock() + + if notifyList { + d.notifier.peerListChanged(numPeers) + } + if notifyRouter { + d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot) } return nil } func (d *Status) AddPeerStateRoute(peer string, route string, resourceId route.ResID) error { d.mux.Lock() - defer d.mux.Unlock() peerState, ok := d.peers[peer] if !ok { + d.mux.Unlock() return errors.New("peer doesn't exist") } @@ -371,17 +377,20 @@ func (d *Status) AddPeerStateRoute(peer string, route string, resourceId route.R d.routeIDLookup.AddRemoteRouteID(resourceId, pref) } + numPeers := d.numOfPeers() + d.mux.Unlock() + // todo: consider to make sense of this notification or not - d.notifyPeerListChanged() + d.notifier.peerListChanged(numPeers) return nil } func (d *Status) RemovePeerStateRoute(peer string, route string) error { d.mux.Lock() - defer d.mux.Unlock() peerState, ok := d.peers[peer] if !ok { + d.mux.Unlock() return errors.New("peer doesn't exist") } @@ -393,8 +402,11 @@ func (d *Status) RemovePeerStateRoute(peer string, route string) error { d.routeIDLookup.RemoveRemoteRouteID(pref) } + numPeers := d.numOfPeers() + d.mux.Unlock() + // todo: consider to make sense of this notification or not - d.notifyPeerListChanged() + d.notifier.peerListChanged(numPeers) return nil } @@ -410,10 +422,10 @@ func (d *Status) CheckRoutes(ip netip.Addr) ([]byte, bool) { func (d *Status) UpdatePeerICEState(receivedState State) error { d.mux.Lock() - defer d.mux.Unlock() peerState, ok := d.peers[receivedState.PubKey] if !ok { + d.mux.Unlock() return errors.New("peer doesn't exist") } @@ -431,22 +443,28 @@ func (d *Status) UpdatePeerICEState(receivedState State) error { d.peers[receivedState.PubKey] = peerState - if hasConnStatusChanged(oldState, receivedState.ConnStatus) { - d.notifyPeerListChanged() - } + notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus) + notifyRouter := hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) + routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter) + numPeers := d.numOfPeers() - if hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) { - d.notifyPeerStateChangeListeners(receivedState.PubKey) + d.mux.Unlock() + + if notifyList { + d.notifier.peerListChanged(numPeers) + } + if notifyRouter { + d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot) } return nil } func (d *Status) UpdatePeerRelayedState(receivedState State) error { d.mux.Lock() - defer d.mux.Unlock() peerState, ok := d.peers[receivedState.PubKey] if !ok { + d.mux.Unlock() return errors.New("peer doesn't exist") } @@ -461,22 +479,28 @@ func (d *Status) UpdatePeerRelayedState(receivedState State) error { d.peers[receivedState.PubKey] = peerState - if hasConnStatusChanged(oldState, receivedState.ConnStatus) { - d.notifyPeerListChanged() - } + notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus) + notifyRouter := hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) + routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter) + numPeers := d.numOfPeers() - if hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) { - d.notifyPeerStateChangeListeners(receivedState.PubKey) + d.mux.Unlock() + + if notifyList { + d.notifier.peerListChanged(numPeers) + } + if notifyRouter { + d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot) } return nil } func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error { d.mux.Lock() - defer d.mux.Unlock() peerState, ok := d.peers[receivedState.PubKey] if !ok { + d.mux.Unlock() return errors.New("peer doesn't exist") } @@ -490,22 +514,28 @@ func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error d.peers[receivedState.PubKey] = peerState - if hasConnStatusChanged(oldState, receivedState.ConnStatus) { - d.notifyPeerListChanged() - } + notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus) + notifyRouter := hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) + routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter) + numPeers := d.numOfPeers() - if hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) { - d.notifyPeerStateChangeListeners(receivedState.PubKey) + d.mux.Unlock() + + if notifyList { + d.notifier.peerListChanged(numPeers) + } + if notifyRouter { + d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot) } return nil } func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error { d.mux.Lock() - defer d.mux.Unlock() peerState, ok := d.peers[receivedState.PubKey] if !ok { + d.mux.Unlock() return errors.New("peer doesn't exist") } @@ -522,12 +552,18 @@ func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error { d.peers[receivedState.PubKey] = peerState - if hasConnStatusChanged(oldState, receivedState.ConnStatus) { - d.notifyPeerListChanged() - } + notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus) + notifyRouter := hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) + routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter) + numPeers := d.numOfPeers() - if hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) { - d.notifyPeerStateChangeListeners(receivedState.PubKey) + d.mux.Unlock() + + if notifyList { + d.notifier.peerListChanged(numPeers) + } + if notifyRouter { + d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot) } return nil } @@ -594,17 +630,33 @@ func (d *Status) UpdatePeerSSHHostKey(peerPubKey string, sshHostKey []byte) erro // FinishPeerListModifications this event invoke the notification func (d *Status) FinishPeerListModifications() { d.mux.Lock() - defer d.mux.Unlock() if !d.peerListChangedForNotification { + d.mux.Unlock() return } d.peerListChangedForNotification = false - d.notifyPeerListChanged() + numPeers := d.numOfPeers() + // snapshot per-peer router state to deliver after the lock is released + type routerDispatch struct { + peerID string + snapshot map[string]RouterState + } + dispatches := make([]routerDispatch, 0, len(d.peers)) for key := range d.peers { - d.notifyPeerStateChangeListeners(key) + snapshot := d.snapshotRouterPeersLocked(key, true) + if snapshot != nil { + dispatches = append(dispatches, routerDispatch{peerID: key, snapshot: snapshot}) + } + } + + d.mux.Unlock() + + d.notifier.peerListChanged(numPeers) + for _, rd := range dispatches { + d.dispatchRouterPeers(rd.peerID, rd.snapshot) } } @@ -655,10 +707,12 @@ func (d *Status) GetLocalPeerState() LocalPeerState { // UpdateLocalPeerState updates local peer status func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) { d.mux.Lock() - defer d.mux.Unlock() - d.localPeer = localPeerState - d.notifyAddressChanged() + fqdn := d.localPeer.FQDN + ip := d.localPeer.IP + d.mux.Unlock() + + d.notifier.localAddressChanged(fqdn, ip) } // AddLocalPeerStateRoute adds a route to the local peer state @@ -721,30 +775,36 @@ func (d *Status) CleanLocalPeerStateRoutes() { // CleanLocalPeerState cleans local peer status func (d *Status) CleanLocalPeerState() { d.mux.Lock() - defer d.mux.Unlock() - d.localPeer = LocalPeerState{} - d.notifyAddressChanged() + fqdn := d.localPeer.FQDN + ip := d.localPeer.IP + d.mux.Unlock() + + d.notifier.localAddressChanged(fqdn, ip) } // MarkManagementDisconnected sets ManagementState to disconnected func (d *Status) MarkManagementDisconnected(err error) { d.mux.Lock() - defer d.mux.Unlock() - defer d.onConnectionChanged() - d.managementState = false d.managementError = err + mgm := d.managementState + sig := d.signalState + d.mux.Unlock() + + d.notifier.updateServerStates(mgm, sig) } // MarkManagementConnected sets ManagementState to connected func (d *Status) MarkManagementConnected() { d.mux.Lock() - defer d.mux.Unlock() - defer d.onConnectionChanged() - d.managementState = true d.managementError = nil + mgm := d.managementState + sig := d.signalState + d.mux.Unlock() + + d.notifier.updateServerStates(mgm, sig) } // UpdateSignalAddress update the address of the signal server @@ -778,21 +838,25 @@ func (d *Status) UpdateLazyConnection(enabled bool) { // MarkSignalDisconnected sets SignalState to disconnected func (d *Status) MarkSignalDisconnected(err error) { d.mux.Lock() - defer d.mux.Unlock() - defer d.onConnectionChanged() - d.signalState = false d.signalError = err + mgm := d.managementState + sig := d.signalState + d.mux.Unlock() + + d.notifier.updateServerStates(mgm, sig) } // MarkSignalConnected sets SignalState to connected func (d *Status) MarkSignalConnected() { d.mux.Lock() - defer d.mux.Unlock() - defer d.onConnectionChanged() - d.signalState = true d.signalError = nil + mgm := d.managementState + sig := d.signalState + d.mux.Unlock() + + d.notifier.updateServerStates(mgm, sig) } func (d *Status) UpdateRelayStates(relayResults []relay.ProbeResult) { @@ -1012,18 +1076,17 @@ func (d *Status) RemoveConnectionListener() { d.notifier.removeListener() } -func (d *Status) onConnectionChanged() { - d.notifier.updateServerStates(d.managementState, d.signalState) -} - -// notifyPeerStateChangeListeners notifies route manager about the change in peer state -func (d *Status) notifyPeerStateChangeListeners(peerID string) { - subs, ok := d.changeNotify[peerID] - if !ok { - return +// snapshotRouterPeersLocked builds the RouterState map for a peer's subscribers. +// Caller MUST hold d.mux. Returns nil when there are no subscribers for peerID +// or when notify is false. The snapshot is consumed later by dispatchRouterPeers +// outside the lock so the channel send cannot stall any d.mux holder. +func (d *Status) snapshotRouterPeersLocked(peerID string, notify bool) map[string]RouterState { + if !notify { + return nil + } + if _, ok := d.changeNotify[peerID]; !ok { + return nil } - - // collect the relevant data for router peers routerPeers := make(map[string]RouterState, len(d.changeNotify)) for pid := range d.changeNotify { s, ok := d.peers[pid] @@ -1031,13 +1094,35 @@ func (d *Status) notifyPeerStateChangeListeners(peerID string) { log.Warnf("router peer not found in peers list: %s", pid) continue } - routerPeers[pid] = RouterState{ Status: s.ConnStatus, Relayed: s.Relayed, Latency: s.Latency, } } + return routerPeers +} + +// dispatchRouterPeers delivers a previously snapshotted router-state map to +// the peer's subscribers. Caller MUST NOT hold d.mux. The method takes a +// fresh, short read of d.changeNotify under the lock to grab subscriber +// channels, then sends outside the lock so a slow consumer cannot block other +// d.mux holders. The send itself stays blocking (only short-circuited by the +// subscriber's context) so peer state transitions are not silently dropped. +func (d *Status) dispatchRouterPeers(peerID string, routerPeers map[string]RouterState) { + if routerPeers == nil { + return + } + + d.mux.Lock() + subsMap, ok := d.changeNotify[peerID] + subs := make([]*StatusChangeSubscription, 0, len(subsMap)) + if ok { + for _, sub := range subsMap { + subs = append(subs, sub) + } + } + d.mux.Unlock() for _, sub := range subs { select { @@ -1047,14 +1132,6 @@ func (d *Status) notifyPeerStateChangeListeners(peerID string) { } } -func (d *Status) notifyPeerListChanged() { - d.notifier.peerListChanged(d.numOfPeers()) -} - -func (d *Status) notifyAddressChanged() { - d.notifier.localAddressChanged(d.localPeer.FQDN, d.localPeer.IP) -} - func (d *Status) numOfPeers() int { return len(d.peers) + len(d.offlinePeers) }