From 855d21c37eb253a7266f9e3de13dcbeeed3d9b3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Mon, 2 Jun 2025 16:18:31 +0200 Subject: [PATCH 1/6] Refactor peer state change subscription mechanism Because the code generated new channel for every single event, was easy to miss notification. Use single channel. --- client/internal/peer/status.go | 78 ++++++++++++++++++++------ client/internal/peer/status_test.go | 6 +- client/internal/routemanager/client.go | 11 ++-- 3 files changed, 70 insertions(+), 25 deletions(-) diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index 40956e68e..52c7fb24b 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -146,11 +146,33 @@ type FullStatus struct { LazyConnectionEnabled bool } +type StatusChangeSubscription struct { + peerID string + id string + eventsChan chan struct{} +} + +func newStatusChangeSubscription(peerID string) *StatusChangeSubscription { + return &StatusChangeSubscription{ + peerID: peerID, + id: uuid.New().String(), + eventsChan: make(chan struct{}, 1), + } +} + +func (s *StatusChangeSubscription) Events() chan struct{} { + return s.eventsChan +} + +func (s *StatusChangeSubscription) close() { + close(s.eventsChan) +} + // Status holds a state of peers, signal, management connections and relays type Status struct { mux sync.Mutex peers map[string]State - changeNotify map[string]chan struct{} + changeNotify map[string]map[string]*StatusChangeSubscription // map[peerID]map[subscriptionID]*StatusChangeSubscription signalState bool signalError error managementState bool @@ -187,7 +209,7 @@ type Status struct { func NewRecorder(mgmAddress string) *Status { return &Status{ peers: make(map[string]State), - changeNotify: make(map[string]chan struct{}), + changeNotify: make(map[string]map[string]*StatusChangeSubscription), eventStreams: make(map[string]chan *proto.SystemEvent), eventQueue: NewEventQueue(eventQueueSize), offlinePeers: make([]State, 0), @@ -312,7 +334,6 @@ func (d *Status) UpdatePeerState(receivedState State) error { // when we close the connection we will not notify the router manager if receivedState.ConnStatus == StatusIdle { d.notifyPeerStateChangeListeners(receivedState.PubKey) - } return nil } @@ -552,19 +573,42 @@ func (d *Status) FinishPeerListModifications() { d.notifyPeerListChanged() } -// GetPeerStateChangeNotifier returns a change notifier channel for a peer -func (d *Status) GetPeerStateChangeNotifier(peer string) <-chan struct{} { +func (d *Status) SubscribeToPeerStateChanges(peerID string) *StatusChangeSubscription { d.mux.Lock() defer d.mux.Unlock() - ch, found := d.changeNotify[peer] - if found { - return ch + sub := newStatusChangeSubscription(peerID) + if _, ok := d.changeNotify[peerID]; !ok { + d.changeNotify[peerID] = make(map[string]*StatusChangeSubscription) + } + d.changeNotify[peerID][sub.id] = sub + + return sub +} + +func (d *Status) UnsubscribePeerStateChanges(subscription *StatusChangeSubscription) { + d.mux.Lock() + defer d.mux.Unlock() + + if subscription == nil { + return } - ch = make(chan struct{}) - d.changeNotify[peer] = ch - return ch + channels, ok := d.changeNotify[subscription.peerID] + if !ok { + return + } + + sub, exists := channels[subscription.id] + if !exists { + return + } + + sub.close() + delete(channels, subscription.id) + if len(channels) == 0 { + delete(d.changeNotify, sub.peerID) + } } // GetLocalPeerState returns the local peer state @@ -939,13 +983,15 @@ func (d *Status) onConnectionChanged() { // notifyPeerStateChangeListeners notifies route manager about the change in peer state func (d *Status) notifyPeerStateChangeListeners(peerID string) { - ch, found := d.changeNotify[peerID] - if !found { + subs, ok := d.changeNotify[peerID] + if !ok { return } - - close(ch) - delete(d.changeNotify, peerID) + for _, sub := range subs { + select { + case sub.eventsChan <- struct{}{}: + } + } } func (d *Status) notifyPeerListChanged() { diff --git a/client/internal/peer/status_test.go b/client/internal/peer/status_test.go index 8f28a9862..4d7a4ca78 100644 --- a/client/internal/peer/status_test.go +++ b/client/internal/peer/status_test.go @@ -86,8 +86,8 @@ func TestGetPeerStateChangeNotifierLogic(t *testing.T) { status := NewRecorder("https://mgm") _ = status.AddPeer(key, "abc.netbird", ip) - ch := status.GetPeerStateChangeNotifier(key) - assert.NotNil(t, ch, "channel shouldn't be nil") + sub := status.SubscribeToPeerStateChanges(key) + assert.NotNil(t, sub, "channel shouldn't be nil") peerState := State{ PubKey: key, @@ -100,7 +100,7 @@ func TestGetPeerStateChangeNotifierLogic(t *testing.T) { assert.NoError(t, err, "shouldn't return error") select { - case <-ch: + case <-sub.eventsChan: default: t.Errorf("channel wasn't closed after update") } diff --git a/client/internal/routemanager/client.go b/client/internal/routemanager/client.go index 137e00d31..797ef6c48 100644 --- a/client/internal/routemanager/client.go +++ b/client/internal/routemanager/client.go @@ -224,19 +224,18 @@ func (c *clientNetwork) getBestRouteFromStatuses(routePeerStatuses map[route.ID] } func (c *clientNetwork) watchPeerStatusChanges(ctx context.Context, peerKey string, peerStateUpdate chan struct{}, closer chan struct{}) { + subscription := c.statusRecorder.SubscribeToPeerStateChanges(peerKey) + defer c.statusRecorder.UnsubscribePeerStateChanges(subscription) + for { select { case <-ctx.Done(): return case <-closer: return - case <-c.statusRecorder.GetPeerStateChangeNotifier(peerKey): - state, err := c.statusRecorder.GetPeer(peerKey) - if err != nil { - continue - } + case <-subscription.Events(): peerStateUpdate <- struct{}{} - log.Debugf("triggered route state update for Peer %s, state: %s", peerKey, state.ConnStatus) + log.Debugf("triggered route state update for Peer: %s", peerKey) } } } From fbdccbc2a1a6f6b29e7b4f01ce7cfe108dcf8465 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Mon, 2 Jun 2025 16:47:29 +0200 Subject: [PATCH 2/6] Fix lint --- client/internal/peer/status.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index 52c7fb24b..abb2aa7b9 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -988,9 +988,9 @@ func (d *Status) notifyPeerStateChangeListeners(peerID string) { return } for _, sub := range subs { - select { - case sub.eventsChan <- struct{}{}: - } + // block the write because we do not want to miss notification + // must have to be sure we will run the GetPeerState() on separated thread + sub.eventsChan <- struct{}{} } } From 2a3623d6ac9b0d83a7012daac632310bd66dfed7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Mon, 2 Jun 2025 16:55:33 +0200 Subject: [PATCH 3/6] Avoid potential deadlock --- client/internal/peer/status.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index abb2aa7b9..d3375f107 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -164,10 +164,6 @@ func (s *StatusChangeSubscription) Events() chan struct{} { return s.eventsChan } -func (s *StatusChangeSubscription) close() { - close(s.eventsChan) -} - // Status holds a state of peers, signal, management connections and relays type Status struct { mux sync.Mutex @@ -604,7 +600,6 @@ func (d *Status) UnsubscribePeerStateChanges(subscription *StatusChangeSubscript return } - sub.close() delete(channels, subscription.id) if len(channels) == 0 { delete(d.changeNotify, sub.peerID) @@ -990,7 +985,9 @@ func (d *Status) notifyPeerStateChangeListeners(peerID string) { for _, sub := range subs { // block the write because we do not want to miss notification // must have to be sure we will run the GetPeerState() on separated thread - sub.eventsChan <- struct{}{} + go func() { + sub.eventsChan <- struct{}{} + }() } } From 8be95a2efb31d598936f2a4195f375555ed5237a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Mon, 2 Jun 2025 17:11:25 +0200 Subject: [PATCH 4/6] Fix test --- client/internal/peer/status_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/client/internal/peer/status_test.go b/client/internal/peer/status_test.go index 4d7a4ca78..33ea4ee0b 100644 --- a/client/internal/peer/status_test.go +++ b/client/internal/peer/status_test.go @@ -1,6 +1,7 @@ package peer import ( + "context" "errors" "sync" "testing" @@ -99,10 +100,12 @@ func TestGetPeerStateChangeNotifierLogic(t *testing.T) { err := status.UpdatePeerRelayedStateToDisconnected(peerState) assert.NoError(t, err, "shouldn't return error") + timeoutCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() select { case <-sub.eventsChan: - default: - t.Errorf("channel wasn't closed after update") + case <-timeoutCtx.Done(): + t.Errorf("timed out waiting for event") } } From 3ecab1f31e8a5db9368f723739643a9a8a3dd58c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Mon, 2 Jun 2025 18:43:08 +0200 Subject: [PATCH 5/6] Add context --- client/internal/peer/status.go | 14 ++++++++++---- client/internal/routemanager/client.go | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index d3375f107..0c6aac372 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -1,6 +1,7 @@ package peer import ( + "context" "errors" "net/netip" "slices" @@ -150,10 +151,12 @@ type StatusChangeSubscription struct { peerID string id string eventsChan chan struct{} + ctx context.Context } -func newStatusChangeSubscription(peerID string) *StatusChangeSubscription { +func newStatusChangeSubscription(ctx context.Context, peerID string) *StatusChangeSubscription { return &StatusChangeSubscription{ + ctx: ctx, peerID: peerID, id: uuid.New().String(), eventsChan: make(chan struct{}, 1), @@ -569,11 +572,11 @@ func (d *Status) FinishPeerListModifications() { d.notifyPeerListChanged() } -func (d *Status) SubscribeToPeerStateChanges(peerID string) *StatusChangeSubscription { +func (d *Status) SubscribeToPeerStateChanges(ctx context.Context, peerID string) *StatusChangeSubscription { d.mux.Lock() defer d.mux.Unlock() - sub := newStatusChangeSubscription(peerID) + sub := newStatusChangeSubscription(ctx, peerID) if _, ok := d.changeNotify[peerID]; !ok { d.changeNotify[peerID] = make(map[string]*StatusChangeSubscription) } @@ -986,7 +989,10 @@ func (d *Status) notifyPeerStateChangeListeners(peerID string) { // block the write because we do not want to miss notification // must have to be sure we will run the GetPeerState() on separated thread go func() { - sub.eventsChan <- struct{}{} + select { + case sub.eventsChan <- struct{}{}: + case <-sub.ctx.Done(): + } }() } } diff --git a/client/internal/routemanager/client.go b/client/internal/routemanager/client.go index 797ef6c48..bff954c27 100644 --- a/client/internal/routemanager/client.go +++ b/client/internal/routemanager/client.go @@ -224,7 +224,7 @@ func (c *clientNetwork) getBestRouteFromStatuses(routePeerStatuses map[route.ID] } func (c *clientNetwork) watchPeerStatusChanges(ctx context.Context, peerKey string, peerStateUpdate chan struct{}, closer chan struct{}) { - subscription := c.statusRecorder.SubscribeToPeerStateChanges(peerKey) + subscription := c.statusRecorder.SubscribeToPeerStateChanges(ctx, peerKey) defer c.statusRecorder.UnsubscribePeerStateChanges(subscription) for { From aa2662a2bb15344b5e247870b28cbeb3be815678 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Mon, 2 Jun 2025 19:08:22 +0200 Subject: [PATCH 6/6] Fix test --- client/internal/peer/status_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/internal/peer/status_test.go b/client/internal/peer/status_test.go index 33ea4ee0b..272638750 100644 --- a/client/internal/peer/status_test.go +++ b/client/internal/peer/status_test.go @@ -87,7 +87,7 @@ func TestGetPeerStateChangeNotifierLogic(t *testing.T) { status := NewRecorder("https://mgm") _ = status.AddPeer(key, "abc.netbird", ip) - sub := status.SubscribeToPeerStateChanges(key) + sub := status.SubscribeToPeerStateChanges(context.Background(), key) assert.NotNil(t, sub, "channel shouldn't be nil") peerState := State{