From 9d11257b1afe89b848099867e6e485bc79550952 Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Wed, 11 Jun 2025 13:33:38 +0200 Subject: [PATCH] [client] Carry the peer's actual state with the notification. (#3929) - Removed separate thread execution of GetStates during notifications. - Updated notification handler to rely on state data included in the notification payload. --- client/internal/peer/status.go | 49 +++++++++++++------ client/internal/routemanager/client/client.go | 39 ++++++++++----- .../routemanager/client/client_bench_test.go | 5 +- 3 files changed, 66 insertions(+), 27 deletions(-) diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index abeafd757..629afec9b 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -41,6 +41,13 @@ type EventListener interface { OnEvent(event *proto.SystemEvent) } +// RouterState status for router peers. This contains relevant fields for route manager +type RouterState struct { + Status ConnStatus + Relayed bool + Latency time.Duration +} + // State contains the latest state of a peer type State struct { Mux *sync.RWMutex @@ -155,20 +162,21 @@ type FullStatus struct { type StatusChangeSubscription struct { peerID string id string - eventsChan chan struct{} + eventsChan chan map[string]RouterState ctx context.Context } func newStatusChangeSubscription(ctx context.Context, peerID string) *StatusChangeSubscription { return &StatusChangeSubscription{ - ctx: ctx, - peerID: peerID, - id: uuid.New().String(), - eventsChan: make(chan struct{}, 1), + ctx: ctx, + peerID: peerID, + id: uuid.New().String(), + // it is a buffer for notifications to block less the status recorded + eventsChan: make(chan map[string]RouterState, 8), } } -func (s *StatusChangeSubscription) Events() chan struct{} { +func (s *StatusChangeSubscription) Events() chan map[string]RouterState { return s.eventsChan } @@ -995,15 +1003,28 @@ func (d *Status) notifyPeerStateChangeListeners(peerID string) { if !ok { return } + + // collect the relevant data for router peers + routerPeers := make(map[string]RouterState, len(d.changeNotify)) + for pid := range d.changeNotify { + s, ok := d.peers[pid] + if !ok { + log.Warnf("router peer not found in peers list: %s", pid) + continue + } + + routerPeers[pid] = RouterState{ + Status: s.ConnStatus, + Relayed: s.Relayed, + Latency: s.Latency, + } + } + for _, sub := range subs { - // block the write because we do not want to miss notification - // must have to be sure we will run the GetPeerState() on separated thread - go func() { - select { - case sub.eventsChan <- struct{}{}: - case <-sub.ctx.Done(): - } - }() + select { + case sub.eventsChan <- routerPeers: + case <-sub.ctx.Done(): + } } } diff --git a/client/internal/routemanager/client/client.go b/client/internal/routemanager/client/client.go index 6e3cf61c9..11c0f5708 100644 --- a/client/internal/routemanager/client/client.go +++ b/client/internal/routemanager/client/client.go @@ -76,7 +76,7 @@ type Watcher struct { wgInterface iface.WGIface routes map[route.ID]*route.Route routeUpdate chan RoutesUpdate - peerStateUpdate chan struct{} + peerStateUpdate chan map[string]peer.RouterState routePeersNotifiers map[string]chan struct{} // map of peer key to channel for peer state changes currentChosen *route.Route currentChosenStatus *routerPeerStatus @@ -95,7 +95,7 @@ func NewWatcher(config WatcherConfig) *Watcher { routes: make(map[route.ID]*route.Route), routePeersNotifiers: make(map[string]chan struct{}), routeUpdate: make(chan RoutesUpdate), - peerStateUpdate: make(chan struct{}), + peerStateUpdate: make(chan map[string]peer.RouterState), handler: config.Handler, currentChosenStatus: nil, } @@ -119,6 +119,23 @@ func (w *Watcher) getRouterPeerStatuses() map[route.ID]routerPeerStatus { return routePeerStatuses } +func (w *Watcher) convertRouterPeerStatuses(states map[string]peer.RouterState) map[route.ID]routerPeerStatus { + routePeerStatuses := make(map[route.ID]routerPeerStatus) + for _, r := range w.routes { + peerStatus, ok := states[r.Peer] + if !ok { + log.Warnf("couldn't fetch peer state: %v", r.Peer) + continue + } + routePeerStatuses[r.ID] = routerPeerStatus{ + status: peerStatus.Status, + relayed: peerStatus.Relayed, + latency: peerStatus.Latency, + } + } + return routePeerStatuses +} + // getBestRouteFromStatuses determines the most optimal route from the available routes // within a Watcher, taking into account peer connection status, route metrics, and // preference for non-relayed and direct connections. @@ -237,7 +254,7 @@ func (w *Watcher) getBestRouteFromStatuses(routePeerStatuses map[route.ID]router return chosen, chosenStatus } -func (w *Watcher) watchPeerStatusChanges(ctx context.Context, peerKey string, peerStateUpdate chan struct{}, closer chan struct{}) { +func (w *Watcher) watchPeerStatusChanges(ctx context.Context, peerKey string, peerStateUpdate chan map[string]peer.RouterState, closer chan struct{}) { subscription := w.statusRecorder.SubscribeToPeerStateChanges(ctx, peerKey) defer w.statusRecorder.UnsubscribePeerStateChanges(subscription) @@ -247,8 +264,8 @@ func (w *Watcher) watchPeerStatusChanges(ctx context.Context, peerKey string, pe return case <-closer: return - case <-subscription.Events(): - peerStateUpdate <- struct{}{} + case routerStates := <-subscription.Events(): + peerStateUpdate <- routerStates log.Debugf("triggered route state update for Peer: %s", peerKey) } } @@ -312,9 +329,7 @@ func (w *Watcher) shouldSkipRecalculation(newChosenID route.ID, newStatus router return true } -func (w *Watcher) recalculateRoutes(rsn reason) error { - routerPeerStatuses := w.getRouterPeerStatuses() - +func (w *Watcher) recalculateRoutes(rsn reason, routerPeerStatuses map[route.ID]routerPeerStatus) error { newChosenID, newStatus := w.getBestRouteFromStatuses(routerPeerStatuses) // If no route is chosen, remove the route from the peer @@ -487,8 +502,9 @@ func (w *Watcher) Start() { select { case <-w.ctx.Done(): return - case <-w.peerStateUpdate: - if err := w.recalculateRoutes(reasonPeerUpdate); err != nil { + case routersStates := <-w.peerStateUpdate: + routerPeerStatuses := w.convertRouterPeerStatuses(routersStates) + if err := w.recalculateRoutes(reasonPeerUpdate, routerPeerStatuses); err != nil { log.Errorf("Failed to recalculate routes for network [%v]: %v", w.handler, err) } case update := <-w.routeUpdate: @@ -512,7 +528,8 @@ func (w *Watcher) handleRouteUpdate(update RoutesUpdate) { if isTrueRouteUpdate { log.Debugf("client network update %v for [%v] contains different routes, recalculating routes", update.UpdateSerial, w.handler) - if err := w.recalculateRoutes(reasonRouteUpdate); err != nil { + routePeerStatuses := w.getRouterPeerStatuses() + if err := w.recalculateRoutes(reasonRouteUpdate, routePeerStatuses); err != nil { log.Errorf("failed to recalculate routes for network [%v]: %v", w.handler, err) } } else { diff --git a/client/internal/routemanager/client/client_bench_test.go b/client/internal/routemanager/client/client_bench_test.go index 1fc41ec33..591042ac5 100644 --- a/client/internal/routemanager/client/client_bench_test.go +++ b/client/internal/routemanager/client/client_bench_test.go @@ -136,7 +136,7 @@ func BenchmarkRecalculateRoutes(b *testing.B) { routes: routes, routePeersNotifiers: make(map[string]chan struct{}), routeUpdate: make(chan RoutesUpdate), - peerStateUpdate: make(chan struct{}), + peerStateUpdate: make(chan map[string]peer.RouterState), handler: &mockRouteHandler{network: "benchmark"}, currentChosenStatus: nil, } @@ -144,8 +144,9 @@ func BenchmarkRecalculateRoutes(b *testing.B) { b.ResetTimer() b.ReportAllocs() + routePeerStatuses := watcher.getRouterPeerStatuses() for i := 0; i < b.N; i++ { - err := watcher.recalculateRoutes(reasonPeerUpdate) + err := watcher.recalculateRoutes(reasonPeerUpdate, routePeerStatuses) if err != nil { b.Fatalf("recalculateRoutes failed: %v", err) }