From 06cc488e90b68cc00fb3bf69420914312d95f22f Mon Sep 17 00:00:00 2001 From: mlsmaycon Date: Thu, 21 May 2026 11:25:56 +0200 Subject: [PATCH] fix(proxy): wire PR #6207 snapshot-apply metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The SyncMappings restore in 036e91cde kept the metric definitions (RecordSnapshotSyncDuration, RecordSnapshotBatchDuration, RecordAddPeerDuration) and the corresponding callbacks (OnAddPeer) but lost their call sites — they shipped as dead code. - proxy/server.go: introduce snapshotTracker (the type PR #6207 added to share batch/sync timing between handleMappingStream and handleSyncMappingsStream); both stream handlers now go through it. - proxy/internal/roundtrip/netbird.go: add OnAddPeer struct field and invoke it after createClientEntry with the per-call duration. - proxy/server.go: wire s.netbird.OnAddPeer = s.meter.RecordAddPeerDuration alongside the existing NetBird construction. No new test coverage — PR #6207's bench tests already exercise the batch/sync paths and continue to pass. --- proxy/internal/roundtrip/netbird.go | 9 +++ proxy/server.go | 102 ++++++++++++++++------------ 2 files changed, 67 insertions(+), 44 deletions(-) diff --git a/proxy/internal/roundtrip/netbird.go b/proxy/internal/roundtrip/netbird.go index 755a49d51..27b15d5a6 100644 --- a/proxy/internal/roundtrip/netbird.go +++ b/proxy/internal/roundtrip/netbird.go @@ -170,6 +170,11 @@ type NetBird struct { // stopHandler runs when an account's last service is removed (or the // transport is shutting down). Receives whatever readyHandler returned. stopHandler func(accountID types.AccountID, state any) + + // OnAddPeer, when set, is called after AddPeer completes for a new account + // (i.e. when a new client was actually created, not when an existing one + // was reused). The duration covers keygen + gRPC CreateProxyPeer + embed.New. + OnAddPeer func(d time.Duration, err error) } // ClientDebugInfo contains debug information about a client. @@ -217,7 +222,11 @@ func (n *NetBird) AddPeer(ctx context.Context, accountID types.AccountID, key Se return nil } + createStart := time.Now() entry, err := n.createClientEntry(ctx, accountID, key, authToken, si) + if n.OnAddPeer != nil { + n.OnAddPeer(time.Since(createStart), err) + } if err != nil { n.clientsMux.Unlock() return err diff --git a/proxy/server.go b/proxy/server.go index f341abe09..7faca6a94 100644 --- a/proxy/server.go +++ b/proxy/server.go @@ -534,6 +534,7 @@ func (s *Server) initNetBirdClient() { // the embedded client never accepts inbound, so block. BlockInbound: !s.Private, }, s.Logger, s, s.mgmtClient) + s.netbird.OnAddPeer = s.meter.RecordAddPeerDuration } // initReverseProxy builds the meter-instrumented reverse proxy. MultiTransport @@ -1273,17 +1274,14 @@ func isSyncUnimplemented(err error) bool { // stream, sending an ack after each batch is fully processed. Management waits // for the ack before sending the next batch, providing application-level // back-pressure. -func (s *Server) handleSyncMappingsStream(ctx context.Context, stream proto.ProxyService_SyncMappingsClient, initialSyncDone *bool, _ time.Time) error { +func (s *Server) handleSyncMappingsStream(ctx context.Context, stream proto.ProxyService_SyncMappingsClient, initialSyncDone *bool, connectTime time.Time) error { select { case <-s.routerReady: case <-ctx.Done(): return ctx.Err() } - var snapshotIDs map[types.ServiceID]struct{} - if !*initialSyncDone { - snapshotIDs = make(map[types.ServiceID]struct{}) - } + tracker := s.newSnapshotTracker(initialSyncDone, connectTime) for { select { @@ -1297,24 +1295,12 @@ func (s *Server) handleSyncMappingsStream(ctx context.Context, stream proto.Prox case err != nil: return fmt.Errorf("receive msg: %w", err) } + + batchStart := time.Now() s.Logger.Debug("Received mapping update, starting processing") s.processMappings(ctx, msg.GetMapping()) s.Logger.Debug("Processing mapping update completed") - - if !*initialSyncDone { - for _, m := range msg.GetMapping() { - snapshotIDs[types.ServiceID(m.GetId())] = struct{}{} - } - if msg.GetInitialSyncComplete() { - s.reconcileSnapshot(ctx, snapshotIDs) - snapshotIDs = nil - if s.healthChecker != nil { - s.healthChecker.SetInitialSyncComplete() - } - *initialSyncDone = true - s.Logger.Info("Initial mapping sync complete") - } - } + tracker.recordBatch(ctx, s, msg.GetMapping(), msg.GetInitialSyncComplete(), batchStart) if err := stream.Send(&proto.SyncMappingsRequest{ Msg: &proto.SyncMappingsRequest_Ack{Ack: &proto.SyncMappingsAck{}}, @@ -1325,52 +1311,80 @@ func (s *Server) handleSyncMappingsStream(ctx context.Context, stream proto.Prox } } -func (s *Server) handleMappingStream(ctx context.Context, mappingClient proto.ProxyService_GetMappingUpdateClient, initialSyncDone *bool, _ time.Time) error { +// snapshotTracker accumulates service IDs during the initial snapshot and +// finalises sync state when the complete flag arrives. Used by both +// handleMappingStream and handleSyncMappingsStream so metric emission and +// reconciliation behave identically on either RPC. +type snapshotTracker struct { + done *bool + connectTime time.Time + snapshotIDs map[types.ServiceID]struct{} +} + +func (s *Server) newSnapshotTracker(done *bool, connectTime time.Time) *snapshotTracker { + var ids map[types.ServiceID]struct{} + if !*done { + ids = make(map[types.ServiceID]struct{}) + } + return &snapshotTracker{done: done, connectTime: connectTime, snapshotIDs: ids} +} + +func (t *snapshotTracker) recordBatch(ctx context.Context, s *Server, mappings []*proto.ProxyMapping, syncComplete bool, batchStart time.Time) { + if *t.done { + return + } + + if s.meter != nil { + s.meter.RecordSnapshotBatchDuration(time.Since(batchStart)) + } + + for _, m := range mappings { + t.snapshotIDs[types.ServiceID(m.GetId())] = struct{}{} + } + + if !syncComplete { + return + } + + s.reconcileSnapshot(ctx, t.snapshotIDs) + t.snapshotIDs = nil + if s.healthChecker != nil { + s.healthChecker.SetInitialSyncComplete() + } + *t.done = true + if s.meter != nil { + s.meter.RecordSnapshotSyncDuration(time.Since(t.connectTime)) + } + s.Logger.Info("Initial mapping sync complete") +} + +func (s *Server) handleMappingStream(ctx context.Context, mappingClient proto.ProxyService_GetMappingUpdateClient, initialSyncDone *bool, connectTime time.Time) error { select { case <-s.routerReady: case <-ctx.Done(): return ctx.Err() } - var snapshotIDs map[types.ServiceID]struct{} - if !*initialSyncDone { - snapshotIDs = make(map[types.ServiceID]struct{}) - } + tracker := s.newSnapshotTracker(initialSyncDone, connectTime) for { - // Check for context completion to gracefully shutdown. select { case <-ctx.Done(): - // Shutting down. return ctx.Err() default: msg, err := mappingClient.Recv() switch { case errors.Is(err, io.EOF): - // Mapping connection gracefully terminated by server. return nil case err != nil: - // Something has gone horribly wrong, return and hope the parent retries the connection. return fmt.Errorf("receive msg: %w", err) } + + batchStart := time.Now() s.Logger.Debug("Received mapping update, starting processing") s.processMappings(ctx, msg.GetMapping()) s.Logger.Debug("Processing mapping update completed") - - if !*initialSyncDone { - for _, m := range msg.GetMapping() { - snapshotIDs[types.ServiceID(m.GetId())] = struct{}{} - } - if msg.GetInitialSyncComplete() { - s.reconcileSnapshot(ctx, snapshotIDs) - snapshotIDs = nil - if s.healthChecker != nil { - s.healthChecker.SetInitialSyncComplete() - } - *initialSyncDone = true - s.Logger.Info("Initial mapping sync complete") - } - } + tracker.recordBatch(ctx, s, msg.GetMapping(), msg.GetInitialSyncComplete(), batchStart) } } }