From 420802374bb461b99198e3cd6c6cf7373e75409a Mon Sep 17 00:00:00 2001 From: pascal Date: Mon, 18 May 2026 19:23:18 +0200 Subject: [PATCH] add metrics --- proxy/handle_mapping_stream_test.go | 7 ++-- proxy/internal/metrics/metrics.go | 59 +++++++++++++++++++++++++++++ proxy/internal/roundtrip/netbird.go | 9 +++++ proxy/server.go | 29 ++++++++++++-- proxy/snapshot_reconcile_test.go | 9 +++-- 5 files changed, 102 insertions(+), 11 deletions(-) diff --git a/proxy/handle_mapping_stream_test.go b/proxy/handle_mapping_stream_test.go index cb16c0814..67c399e44 100644 --- a/proxy/handle_mapping_stream_test.go +++ b/proxy/handle_mapping_stream_test.go @@ -4,6 +4,7 @@ import ( "context" "io" "testing" + "time" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -59,7 +60,7 @@ func TestHandleMappingStream_SyncCompleteFlag(t *testing.T) { } syncDone := false - err := s.handleMappingStream(context.Background(), stream, &syncDone) + err := s.handleMappingStream(context.Background(), stream, &syncDone, time.Time{}) assert.NoError(t, err) assert.True(t, syncDone, "initial sync should be marked done when flag is set") } @@ -79,7 +80,7 @@ func TestHandleMappingStream_NoSyncFlagDoesNotMarkDone(t *testing.T) { } syncDone := false - err := s.handleMappingStream(context.Background(), stream, &syncDone) + err := s.handleMappingStream(context.Background(), stream, &syncDone, time.Time{}) assert.NoError(t, err) assert.False(t, syncDone, "initial sync should not be marked done without flag") } @@ -97,7 +98,7 @@ func TestHandleMappingStream_NilHealthChecker(t *testing.T) { } syncDone := false - err := s.handleMappingStream(context.Background(), stream, &syncDone) + err := s.handleMappingStream(context.Background(), stream, &syncDone, time.Time{}) assert.NoError(t, err) assert.True(t, syncDone, "sync done flag should be set even without health checker") } diff --git a/proxy/internal/metrics/metrics.go b/proxy/internal/metrics/metrics.go index 573485625..53c134a84 100644 --- a/proxy/internal/metrics/metrics.go +++ b/proxy/internal/metrics/metrics.go @@ -25,6 +25,11 @@ type Metrics struct { backendDuration metric.Int64Histogram certificateIssueDuration metric.Int64Histogram + // Management sync metrics. + snapshotSyncDuration metric.Int64Histogram + snapshotBatchDuration metric.Int64Histogram + addPeerDuration metric.Int64Histogram + // L4 service-level metrics. l4Services metric.Int64UpDownCounter @@ -54,6 +59,9 @@ func New(ctx context.Context, meter metric.Meter) (*Metrics, error) { if err := m.initHTTPMetrics(meter); err != nil { return nil, err } + if err := m.initSyncMetrics(meter); err != nil { + return nil, err + } if err := m.initL4Metrics(meter); err != nil { return nil, err } @@ -126,6 +134,57 @@ func (m *Metrics) initHTTPMetrics(meter metric.Meter) error { return err } +func (m *Metrics) initSyncMetrics(meter metric.Meter) error { + var err error + + m.snapshotSyncDuration, err = meter.Int64Histogram( + "proxy.sync.snapshot.duration.ms", + metric.WithUnit("milliseconds"), + metric.WithDescription("Duration from management connect until the initial snapshot sync is complete"), + ) + if err != nil { + return err + } + + m.snapshotBatchDuration, err = meter.Int64Histogram( + "proxy.sync.batch.duration.ms", + metric.WithUnit("milliseconds"), + metric.WithDescription("Duration to process a single mapping batch during initial snapshot sync"), + ) + if err != nil { + return err + } + + m.addPeerDuration, err = meter.Int64Histogram( + "proxy.peer.add.duration.ms", + metric.WithUnit("milliseconds"), + metric.WithDescription("Duration to add a peer for an account (keygen + gRPC CreateProxyPeer + embed.New)"), + metric.WithExplicitBucketBoundaries(10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000), + ) + return err +} + +// RecordSnapshotSyncDuration records the total time from connect to sync-complete. +func (m *Metrics) RecordSnapshotSyncDuration(d time.Duration) { + m.snapshotSyncDuration.Record(m.ctx, d.Milliseconds()) +} + +// RecordSnapshotBatchDuration records the time to process one mapping batch during initial sync. +func (m *Metrics) RecordSnapshotBatchDuration(d time.Duration) { + m.snapshotBatchDuration.Record(m.ctx, d.Milliseconds()) +} + +// RecordAddPeerDuration records the time to create a new peer for an account. +func (m *Metrics) RecordAddPeerDuration(d time.Duration, err error) { + result := "success" + if err != nil { + result = "error" + } + m.addPeerDuration.Record(m.ctx, d.Milliseconds(), metric.WithAttributes( + attribute.String("result", result), + )) +} + func (m *Metrics) initL4Metrics(meter metric.Meter) error { var err error diff --git a/proxy/internal/roundtrip/netbird.go b/proxy/internal/roundtrip/netbird.go index ce9e8ddf3..28eba7810 100644 --- a/proxy/internal/roundtrip/netbird.go +++ b/proxy/internal/roundtrip/netbird.go @@ -142,6 +142,11 @@ type NetBird struct { clients map[types.AccountID]*clientEntry initLogOnce sync.Once statusNotifier statusNotifier + + // OnAddPeer, when set, is called after AddPeer completes for a new account + // (i.e. when a new client was actually created, not when an existing one + // was reused). The duration covers keygen + gRPC CreateProxyPeer + embed.New. + OnAddPeer func(d time.Duration, err error) } // ClientDebugInfo contains debug information about a client. @@ -215,7 +220,11 @@ func (n *NetBird) AddPeer(ctx context.Context, accountID types.AccountID, key Se n.clients[accountID] = entry n.clientsMux.Unlock() + createStart := time.Now() created, err := n.createClientEntry(ctx, accountID, key, authToken, si) + if n.OnAddPeer != nil { + n.OnAddPeer(time.Since(createStart), err) + } if err != nil { entry.initErr = err close(entry.ready) diff --git a/proxy/server.go b/proxy/server.go index 775a650bf..ea782e947 100644 --- a/proxy/server.go +++ b/proxy/server.go @@ -284,6 +284,7 @@ func (s *Server) ListenAndServe(ctx context.Context, addr string) (err error) { WGPort: s.WireguardPort, PreSharedKey: s.PreSharedKey, }, s.Logger, s, s.mgmtClient) + s.netbird.OnAddPeer = s.meter.RecordAddPeerDuration // Create health checker before the mapping worker so it can track // management connectivity from the first stream connection. @@ -999,6 +1000,7 @@ func (s *Server) proxyCapabilities() *proto.ProxyCapabilities { } func (s *Server) tryGetMappingUpdate(ctx context.Context, client proto.ProxyServiceClient, initialSyncDone *bool) error { + connectTime := time.Now() mappingClient, err := client.GetMappingUpdate(ctx, &proto.GetMappingUpdateRequest{ ProxyId: s.ID, Version: s.Version, @@ -1015,10 +1017,11 @@ func (s *Server) tryGetMappingUpdate(ctx context.Context, client proto.ProxyServ } s.Logger.Debug("management mapping stream established (GetMappingUpdate)") - return s.handleMappingStream(ctx, mappingClient, initialSyncDone) + return s.handleMappingStream(ctx, mappingClient, initialSyncDone, connectTime) } func (s *Server) trySyncMappings(ctx context.Context, client proto.ProxyServiceClient, initialSyncDone *bool) error { + connectTime := time.Now() stream, err := client.SyncMappings(ctx) if err != nil { return fmt.Errorf("create sync stream: %w", err) @@ -1044,7 +1047,7 @@ func (s *Server) trySyncMappings(ctx context.Context, client proto.ProxyServiceC } s.Logger.Debug("management mapping stream established (SyncMappings)") - return s.handleSyncMappingsStream(ctx, stream, initialSyncDone) + return s.handleSyncMappingsStream(ctx, stream, initialSyncDone, connectTime) } func isSyncUnimplemented(err error) bool { @@ -1055,7 +1058,7 @@ func isSyncUnimplemented(err error) bool { return ok && st.Code() == codes.Unimplemented } -func (s *Server) handleSyncMappingsStream(ctx context.Context, stream proto.ProxyService_SyncMappingsClient, initialSyncDone *bool) error { +func (s *Server) handleSyncMappingsStream(ctx context.Context, stream proto.ProxyService_SyncMappingsClient, initialSyncDone *bool, connectTime time.Time) error { select { case <-s.routerReady: case <-ctx.Done(): @@ -1079,10 +1082,16 @@ func (s *Server) handleSyncMappingsStream(ctx context.Context, stream proto.Prox case err != nil: return fmt.Errorf("receive msg: %w", err) } + + batchStart := time.Now() s.Logger.Debug("Received mapping update, starting processing") s.processMappings(ctx, msg.GetMapping()) s.Logger.Debug("Processing mapping update completed") + if !*initialSyncDone && s.meter != nil { + s.meter.RecordSnapshotBatchDuration(time.Since(batchStart)) + } + // Send ack so management knows we're ready for the next batch. if err := stream.Send(&proto.SyncMappingsRequest{ Msg: &proto.SyncMappingsRequest_Ack{Ack: &proto.SyncMappingsAck{}}, @@ -1101,6 +1110,9 @@ func (s *Server) handleSyncMappingsStream(ctx context.Context, stream proto.Prox s.healthChecker.SetInitialSyncComplete() } *initialSyncDone = true + if s.meter != nil { + s.meter.RecordSnapshotSyncDuration(time.Since(connectTime)) + } s.Logger.Info("Initial mapping sync complete") } } @@ -1108,7 +1120,7 @@ func (s *Server) handleSyncMappingsStream(ctx context.Context, stream proto.Prox } } -func (s *Server) handleMappingStream(ctx context.Context, mappingClient proto.ProxyService_GetMappingUpdateClient, initialSyncDone *bool) error { +func (s *Server) handleMappingStream(ctx context.Context, mappingClient proto.ProxyService_GetMappingUpdateClient, initialSyncDone *bool, connectTime time.Time) error { select { case <-s.routerReady: case <-ctx.Done(): @@ -1136,10 +1148,16 @@ func (s *Server) handleMappingStream(ctx context.Context, mappingClient proto.Pr // Something has gone horribly wrong, return and hope the parent retries the connection. return fmt.Errorf("receive msg: %w", err) } + + batchStart := time.Now() s.Logger.Debug("Received mapping update, starting processing") s.processMappings(ctx, msg.GetMapping()) s.Logger.Debug("Processing mapping update completed") + if !*initialSyncDone && s.meter != nil { + s.meter.RecordSnapshotBatchDuration(time.Since(batchStart)) + } + if !*initialSyncDone { for _, m := range msg.GetMapping() { snapshotIDs[types.ServiceID(m.GetId())] = struct{}{} @@ -1151,6 +1169,9 @@ func (s *Server) handleMappingStream(ctx context.Context, mappingClient proto.Pr s.healthChecker.SetInitialSyncComplete() } *initialSyncDone = true + if s.meter != nil { + s.meter.RecordSnapshotSyncDuration(time.Since(connectTime)) + } s.Logger.Info("Initial mapping sync complete") } } diff --git a/proxy/snapshot_reconcile_test.go b/proxy/snapshot_reconcile_test.go index 042d8df77..2e6c80cfc 100644 --- a/proxy/snapshot_reconcile_test.go +++ b/proxy/snapshot_reconcile_test.go @@ -4,6 +4,7 @@ import ( "context" "io" "testing" + "time" log "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -139,7 +140,7 @@ func TestHandleMappingStream_BatchedSnapshotSyncComplete(t *testing.T) { } syncDone := false - err := s.handleMappingStream(context.Background(), stream, &syncDone) + err := s.handleMappingStream(context.Background(), stream, &syncDone, time.Time{}) assert.NoError(t, err) assert.True(t, syncDone, "sync should be marked done after final batch") } @@ -164,7 +165,7 @@ func TestHandleMappingStream_PostSyncDoesNotReconcile(t *testing.T) { } syncDone := true // sync already completed in a previous stream - err := s.handleMappingStream(context.Background(), stream, &syncDone) + err := s.handleMappingStream(context.Background(), stream, &syncDone, time.Time{}) require.NoError(t, err) assert.Len(t, s.lastMappings, 2, @@ -185,7 +186,7 @@ func TestHandleMappingStream_ImmediateEOF_NoReconciliation(t *testing.T) { stream := &mockMappingStream{} // no messages → immediate EOF syncDone := false - err := s.handleMappingStream(context.Background(), stream, &syncDone) + err := s.handleMappingStream(context.Background(), stream, &syncDone, time.Time{}) assert.NoError(t, err) assert.False(t, syncDone, "sync should not be marked done on immediate EOF") @@ -218,7 +219,7 @@ func TestHandleMappingStream_ErrorMidSync_NoReconciliation(t *testing.T) { s.lastMappings["svc-stale"] = &proto.ProxyMapping{Id: "svc-stale", AccountId: "acct-1"} syncDone := false - err := s.handleMappingStream(context.Background(), &mockErrRecvStream{}, &syncDone) + err := s.handleMappingStream(context.Background(), &mockErrRecvStream{}, &syncDone, time.Time{}) assert.Error(t, err) assert.False(t, syncDone)