mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-05 00:26:39 +00:00
The Status recorder used to fire notifier callbacks while holding d.mux: - notifyPeerListChanged / notifyPeerStateChangeListeners ran from inside the locked section of every Update*/AddPeerStateRoute/etc. - notifyAddressChanged ran from UpdateLocalPeerState and CleanLocalPeerState while d.mux was held. - onConnectionChanged was registered with a defer above defer d.mux.Unlock, so it executed before the mutex was released in the Mark*Connected/ Disconnected helpers. - notifyPeerStateChangeListeners did a blocking channel send under d.mux, so a slow subscriber stalled every other d.mux holder. A listener that re-enters the recorder (e.g. calls GetFullStatus from within a callback) deadlocks against d.mux, and any callback that takes longer than expected stalls every other state query for its duration. Capture the values needed for notification under the lock, release d.mux, then call the notifier. Build per-peer router-state snapshots inside the lock and dispatch them via dispatchRouterPeers afterwards. The router-peer channel send stays blocking, but now happens outside d.mux so a slow consumer cannot stall any other d.mux holder, and no peer state transitions are silently dropped. The notifier itself is unchanged: its internal state was already protected by its own locks, and the field d.notifier is set once in NewRecorder and never reassigned, so reading it without d.mux is safe. Also fix a pre-existing race in Test_notifier_RemoveListener / Test_notifier_SetListener: setListener spawns a goroutine that writes listener.peers, but the tests read listener.peers without waiting for it.
115 lines
2.4 KiB
Go
115 lines
2.4 KiB
Go
package peer
|
|
|
|
import (
|
|
"sync"
|
|
"testing"
|
|
)
|
|
|
|
type mocListener struct {
|
|
lastState int
|
|
wg sync.WaitGroup
|
|
peersWg sync.WaitGroup
|
|
peers int
|
|
}
|
|
|
|
func (l *mocListener) OnConnected() {
|
|
l.lastState = stateConnected
|
|
l.wg.Done()
|
|
}
|
|
func (l *mocListener) OnDisconnected() {
|
|
l.lastState = stateDisconnected
|
|
l.wg.Done()
|
|
}
|
|
func (l *mocListener) OnConnecting() {
|
|
l.lastState = stateConnecting
|
|
l.wg.Done()
|
|
}
|
|
func (l *mocListener) OnDisconnecting() {
|
|
l.lastState = stateDisconnecting
|
|
l.wg.Done()
|
|
}
|
|
|
|
func (l *mocListener) OnAddressChanged(host, addr string) {
|
|
|
|
}
|
|
func (l *mocListener) OnPeersListChanged(size int) {
|
|
l.peers = size
|
|
l.peersWg.Done()
|
|
}
|
|
|
|
func (l *mocListener) setWaiter() {
|
|
l.wg.Add(1)
|
|
}
|
|
|
|
func (l *mocListener) wait() {
|
|
l.wg.Wait()
|
|
}
|
|
|
|
func (l *mocListener) setPeersWaiter() {
|
|
l.peersWg.Add(1)
|
|
}
|
|
|
|
func (l *mocListener) waitPeers() {
|
|
l.peersWg.Wait()
|
|
}
|
|
|
|
func Test_notifier_serverState(t *testing.T) {
|
|
|
|
type scenario struct {
|
|
name string
|
|
expected int
|
|
mgmState bool
|
|
signalState bool
|
|
}
|
|
scenarios := []scenario{
|
|
{"connected", stateConnected, true, true},
|
|
{"mgm down", stateConnecting, false, true},
|
|
{"signal down", stateConnecting, true, false},
|
|
{"disconnected", stateDisconnected, false, false},
|
|
}
|
|
|
|
for _, tt := range scenarios {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
n := newNotifier()
|
|
n.updateServerStates(tt.mgmState, tt.signalState)
|
|
if n.lastNotification != tt.expected {
|
|
t.Errorf("invalid serverstate: %d, expected: %d", n.lastNotification, tt.expected)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func Test_notifier_SetListener(t *testing.T) {
|
|
listener := &mocListener{}
|
|
listener.setWaiter()
|
|
listener.setPeersWaiter()
|
|
|
|
n := newNotifier()
|
|
n.lastNotification = stateConnecting
|
|
n.setListener(listener)
|
|
listener.wait()
|
|
listener.waitPeers()
|
|
if listener.lastState != n.lastNotification {
|
|
t.Errorf("invalid state: %d, expected: %d", listener.lastState, n.lastNotification)
|
|
}
|
|
}
|
|
|
|
func Test_notifier_RemoveListener(t *testing.T) {
|
|
listener := &mocListener{}
|
|
listener.setWaiter()
|
|
listener.setPeersWaiter()
|
|
n := newNotifier()
|
|
n.lastNotification = stateConnecting
|
|
n.setListener(listener)
|
|
// setListener replays cached state on a goroutine; wait for both the state
|
|
// and peers callbacks to finish so we don't race on listener.peers.
|
|
listener.wait()
|
|
listener.waitPeers()
|
|
n.removeListener()
|
|
n.peerListChanged(1)
|
|
|
|
if listener.peers != 0 {
|
|
t.Errorf("invalid state: %d", listener.peers)
|
|
}
|
|
}
|