From af24fd779640538c05c5f261a1e9fdf20fe7773f Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Mon, 18 May 2026 22:55:19 +0200 Subject: [PATCH] [management] Add metrics for peer status updates and ephemeral cleanup (#6196) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [management] Add metrics for peer status updates and ephemeral cleanup The session-fenced MarkPeerConnected / MarkPeerDisconnected path and the ephemeral peer cleanup loop both run silently today: when fencing rejects a stale stream, when a cleanup tick deletes peers, or when a batch delete fails, we have no operational signal beyond log lines. Add OpenTelemetry counters and a histogram so the same SLO-style dashboards that already exist for the network-map controller can cover peer connect/disconnect and ephemeral cleanup too. All new attributes are bounded enums: operation in {connect,disconnect} and outcome in {applied,stale,error,peer_not_found}. No account, peer, or user ID is ever written as a metric label — total cardinality is fixed at compile time (8 counter series, 2 histogram series, 4 unlabeled ephemeral series). Metric methods are nil-receiver safe so test composition that doesn't wire telemetry (the bulk of the existing tests) works unchanged. The ephemeral manager exposes a SetMetrics setter rather than taking the collector through its constructor, keeping the constructor signature stable across all test call sites. * [management] Add OpenTelemetry metrics for ephemeral peer cleanup Introduce counters for tracking ephemeral peer cleanup, including peers pending deletion, cleanup runs, successful deletions, and failed batches. Metrics are nil-receiver safe to ensure compatibility with test setups without telemetry. --- .../peers/ephemeral/manager/ephemeral.go | 45 ++++++- management/internals/server/controllers.go | 6 +- management/server/peer.go | 28 +++++ .../telemetry/accountmanager_metrics.go | 65 ++++++++++ management/server/telemetry/app_metrics.go | 28 +++++ .../server/telemetry/ephemeral_metrics.go | 115 ++++++++++++++++++ 6 files changed, 281 insertions(+), 6 deletions(-) create mode 100644 management/server/telemetry/ephemeral_metrics.go diff --git a/management/internals/modules/peers/ephemeral/manager/ephemeral.go b/management/internals/modules/peers/ephemeral/manager/ephemeral.go index 758f643d0..0f902ea70 100644 --- a/management/internals/modules/peers/ephemeral/manager/ephemeral.go +++ b/management/internals/modules/peers/ephemeral/manager/ephemeral.go @@ -11,6 +11,7 @@ import ( "github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral" "github.com/netbirdio/netbird/management/server/activity" nbpeer "github.com/netbirdio/netbird/management/server/peer" + "github.com/netbirdio/netbird/management/server/telemetry" "github.com/netbirdio/netbird/management/server/store" ) @@ -47,6 +48,11 @@ type EphemeralManager struct { lifeTime time.Duration cleanupWindow time.Duration + + // metrics is nil-safe; methods on telemetry.EphemeralPeersMetrics + // no-op when the receiver is nil so deployments without an app + // metrics provider work unchanged. + metrics *telemetry.EphemeralPeersMetrics } // NewEphemeralManager instantiate new EphemeralManager @@ -60,6 +66,15 @@ func NewEphemeralManager(store store.Store, peersManager peers.Manager) *Ephemer } } +// SetMetrics attaches a metrics collector. Safe to call once before +// LoadInitialPeers; later attachment is fine but earlier loads won't be +// reflected in the gauge. Pass nil to detach. +func (e *EphemeralManager) SetMetrics(m *telemetry.EphemeralPeersMetrics) { + e.peersLock.Lock() + e.metrics = m + e.peersLock.Unlock() +} + // LoadInitialPeers load from the database the ephemeral type of peers and schedule a cleanup procedure to the head // of the linked list (to the most deprecated peer). At the end of cleanup it schedules the next cleanup to the new // head. @@ -97,7 +112,9 @@ func (e *EphemeralManager) OnPeerConnected(ctx context.Context, peer *nbpeer.Pee e.peersLock.Lock() defer e.peersLock.Unlock() - e.removePeer(peer.ID) + if e.removePeer(peer.ID) { + e.metrics.DecPending(1) + } // stop the unnecessary timer if e.headPeer == nil && e.timer != nil { @@ -123,6 +140,7 @@ func (e *EphemeralManager) OnPeerDisconnected(ctx context.Context, peer *nbpeer. } e.addPeer(peer.AccountID, peer.ID, e.newDeadLine()) + e.metrics.IncPending() if e.timer == nil { delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow if delay < 0 { @@ -145,6 +163,7 @@ func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) { for _, p := range peers { e.addPeer(p.AccountID, p.ID, t) } + e.metrics.AddPending(int64(len(peers))) log.WithContext(ctx).Debugf("loaded ephemeral peer(s): %d", len(peers)) } @@ -181,6 +200,15 @@ func (e *EphemeralManager) cleanup(ctx context.Context) { e.peersLock.Unlock() + // Drop the gauge by the number of entries we just took off the list, + // regardless of whether the subsequent DeletePeers call succeeds. The + // list invariant is what the gauge tracks; failed delete batches are + // counted separately via CountCleanupError so we can still see them. + if len(deletePeers) > 0 { + e.metrics.CountCleanupRun() + e.metrics.DecPending(int64(len(deletePeers))) + } + peerIDsPerAccount := make(map[string][]string) for id, p := range deletePeers { peerIDsPerAccount[p.accountID] = append(peerIDsPerAccount[p.accountID], id) @@ -191,7 +219,10 @@ func (e *EphemeralManager) cleanup(ctx context.Context) { err := e.peersManager.DeletePeers(ctx, accountID, peerIDs, activity.SystemInitiator, true) if err != nil { log.WithContext(ctx).Errorf("failed to delete ephemeral peers: %s", err) + e.metrics.CountCleanupError() + continue } + e.metrics.CountPeersCleaned(int64(len(peerIDs))) } } @@ -211,9 +242,12 @@ func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline tim e.tailPeer = ep } -func (e *EphemeralManager) removePeer(id string) { +// removePeer drops the entry from the linked list. Returns true if a +// matching entry was found and removed so callers can keep the pending +// metric gauge in sync. +func (e *EphemeralManager) removePeer(id string) bool { if e.headPeer == nil { - return + return false } if e.headPeer.id == id { @@ -221,7 +255,7 @@ func (e *EphemeralManager) removePeer(id string) { if e.tailPeer.id == id { e.tailPeer = nil } - return + return true } for p := e.headPeer; p.next != nil; p = p.next { @@ -231,9 +265,10 @@ func (e *EphemeralManager) removePeer(id string) { e.tailPeer = p } p.next = p.next.next - return + return true } } + return false } func (e *EphemeralManager) isPeerOnList(id string) bool { diff --git a/management/internals/server/controllers.go b/management/internals/server/controllers.go index 89bdf0abe..794c3ebe0 100644 --- a/management/internals/server/controllers.go +++ b/management/internals/server/controllers.go @@ -112,7 +112,11 @@ func (s *BaseServer) AuthManager() auth.Manager { func (s *BaseServer) EphemeralManager() ephemeral.Manager { return Create(s, func() ephemeral.Manager { - return manager.NewEphemeralManager(s.Store(), s.PeersManager()) + em := manager.NewEphemeralManager(s.Store(), s.PeersManager()) + if metrics := s.Metrics(); metrics != nil { + em.SetMetrics(metrics.EphemeralPeersMetrics()) + } + return em }) } diff --git a/management/server/peer.go b/management/server/peer.go index 4790a5aab..34b681f51 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -28,6 +28,7 @@ import ( "github.com/netbirdio/netbird/management/server/activity" nbpeer "github.com/netbirdio/netbird/management/server/peer" + "github.com/netbirdio/netbird/management/server/telemetry" "github.com/netbirdio/netbird/shared/management/status" ) @@ -72,19 +73,32 @@ func (am *DefaultAccountManager) GetPeers(ctx context.Context, accountID, userID // Disconnects use MarkPeerDisconnected and require the session to match // exactly; see PeerStatus.SessionStartedAt for the protocol. func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, realIP net.IP, accountID string, sessionStartedAt int64) error { + start := time.Now() + defer func() { + am.metrics.AccountManagerMetrics().RecordPeerStatusUpdateDuration(telemetry.PeerStatusConnect, time.Since(start)) + }() + peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerPubKey) if err != nil { + outcome := telemetry.PeerStatusError + if s, ok := status.FromError(err); ok && s.Type() == status.NotFound { + outcome = telemetry.PeerStatusPeerNotFound + } + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, outcome) return err } updated, err := am.Store.MarkPeerConnectedIfNewerSession(ctx, accountID, peer.ID, sessionStartedAt) if err != nil { + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, telemetry.PeerStatusError) return err } if !updated { + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, telemetry.PeerStatusStale) log.WithContext(ctx).Tracef("peer %s already has a newer session in store, skipping connect", peer.ID) return nil } + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, telemetry.PeerStatusApplied) if am.geo != nil && realIP != nil { am.updatePeerLocationIfChanged(ctx, accountID, peer, realIP) @@ -119,19 +133,33 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK // newer stream has already taken ownership of the peer — disconnects from // the older stream are ignored. LastSeen is written by the database. func (am *DefaultAccountManager) MarkPeerDisconnected(ctx context.Context, peerPubKey string, accountID string, sessionStartedAt int64) error { + start := time.Now() + defer func() { + am.metrics.AccountManagerMetrics().RecordPeerStatusUpdateDuration(telemetry.PeerStatusDisconnect, time.Since(start)) + }() + peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerPubKey) if err != nil { + outcome := telemetry.PeerStatusError + if s, ok := status.FromError(err); ok && s.Type() == status.NotFound { + outcome = telemetry.PeerStatusPeerNotFound + } + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, outcome) return err } updated, err := am.Store.MarkPeerDisconnectedIfSameSession(ctx, accountID, peer.ID, sessionStartedAt) if err != nil { + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, telemetry.PeerStatusError) return err } if !updated { + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, telemetry.PeerStatusStale) log.WithContext(ctx).Tracef("peer %s session token mismatch on disconnect (token=%d), skipping", peer.ID, sessionStartedAt) + return nil } + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, telemetry.PeerStatusApplied) return nil } diff --git a/management/server/telemetry/accountmanager_metrics.go b/management/server/telemetry/accountmanager_metrics.go index 518aae7eb..bb6fb7e12 100644 --- a/management/server/telemetry/accountmanager_metrics.go +++ b/management/server/telemetry/accountmanager_metrics.go @@ -16,6 +16,8 @@ type AccountManagerMetrics struct { getPeerNetworkMapDurationMs metric.Float64Histogram networkMapObjectCount metric.Int64Histogram peerMetaUpdateCount metric.Int64Counter + peerStatusUpdateCounter metric.Int64Counter + peerStatusUpdateDurationMs metric.Float64Histogram } // NewAccountManagerMetrics creates an instance of AccountManagerMetrics @@ -64,6 +66,24 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account return nil, err } + // peerStatusUpdateCounter records every attempt to mark a peer as connected or disconnected + peerStatusUpdateCounter, err := meter.Int64Counter("management.account.peer.status.update.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of peer status update attempts, labeled by operation (connect|disconnect) and outcome (applied|stale|error|peer_not_found)")) + if err != nil { + return nil, err + } + + peerStatusUpdateDurationMs, err := meter.Float64Histogram("management.account.peer.status.update.duration.ms", + metric.WithUnit("milliseconds"), + metric.WithExplicitBucketBoundaries( + 1, 5, 15, 25, 50, 100, 250, 500, 1000, 2000, 5000, + ), + metric.WithDescription("Duration of a peer status update (fence UPDATE + post-write side effects), labeled by operation")) + if err != nil { + return nil, err + } + return &AccountManagerMetrics{ ctx: ctx, getPeerNetworkMapDurationMs: getPeerNetworkMapDurationMs, @@ -71,10 +91,35 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account updateAccountPeersCounter: updateAccountPeersCounter, networkMapObjectCount: networkMapObjectCount, peerMetaUpdateCount: peerMetaUpdateCount, + peerStatusUpdateCounter: peerStatusUpdateCounter, + peerStatusUpdateDurationMs: peerStatusUpdateDurationMs, }, nil } +// PeerStatusOperation labels the kind of fence-locked peer status write. +type PeerStatusOperation string + +// PeerStatusOutcome labels how a fence-locked peer status write resolved. +type PeerStatusOutcome string + +const ( + PeerStatusConnect PeerStatusOperation = "connect" + PeerStatusDisconnect PeerStatusOperation = "disconnect" + + // PeerStatusApplied — the fence WHERE matched and the UPDATE landed. + PeerStatusApplied PeerStatusOutcome = "applied" + // PeerStatusStale — the fence WHERE rejected the write because a + // newer session has already taken ownership (connect: stored token + // >= incoming; disconnect: stored token != incoming). + PeerStatusStale PeerStatusOutcome = "stale" + // PeerStatusError — the store returned a non-NotFound error. + PeerStatusError PeerStatusOutcome = "error" + // PeerStatusPeerNotFound — the peer lookup failed (the peer was + // deleted between the gRPC sync handshake and the status write). + PeerStatusPeerNotFound PeerStatusOutcome = "peer_not_found" +) + // CountUpdateAccountPeersDuration counts the duration of updating account peers func (metrics *AccountManagerMetrics) CountUpdateAccountPeersDuration(duration time.Duration) { metrics.updateAccountPeersDurationMs.Record(metrics.ctx, float64(duration.Nanoseconds())/1e6) @@ -104,3 +149,23 @@ func (metrics *AccountManagerMetrics) CountUpdateAccountPeersTriggered(resource, func (metrics *AccountManagerMetrics) CountPeerMetUpdate() { metrics.peerMetaUpdateCount.Add(metrics.ctx, 1) } + +// CountPeerStatusUpdate increments the connect/disconnect counter, +// labeled by operation and outcome. Both labels are bounded enums. +func (metrics *AccountManagerMetrics) CountPeerStatusUpdate(op PeerStatusOperation, outcome PeerStatusOutcome) { + metrics.peerStatusUpdateCounter.Add(metrics.ctx, 1, + metric.WithAttributes( + attribute.String("operation", string(op)), + attribute.String("outcome", string(outcome)), + ), + ) +} + +// RecordPeerStatusUpdateDuration records the wall-clock time spent +// running a peer status update (including post-write side effects), +// labeled by operation. +func (metrics *AccountManagerMetrics) RecordPeerStatusUpdateDuration(op PeerStatusOperation, d time.Duration) { + metrics.peerStatusUpdateDurationMs.Record(metrics.ctx, float64(d.Nanoseconds())/1e6, + metric.WithAttributes(attribute.String("operation", string(op))), + ) +} diff --git a/management/server/telemetry/app_metrics.go b/management/server/telemetry/app_metrics.go index 1fd78bc3a..fd9087a96 100644 --- a/management/server/telemetry/app_metrics.go +++ b/management/server/telemetry/app_metrics.go @@ -29,6 +29,7 @@ type MockAppMetrics struct { StoreMetricsFunc func() *StoreMetrics UpdateChannelMetricsFunc func() *UpdateChannelMetrics AddAccountManagerMetricsFunc func() *AccountManagerMetrics + EphemeralPeersMetricsFunc func() *EphemeralPeersMetrics } // GetMeter mocks the GetMeter function of the AppMetrics interface @@ -103,6 +104,14 @@ func (mock *MockAppMetrics) AccountManagerMetrics() *AccountManagerMetrics { return nil } +// EphemeralPeersMetrics mocks the MockAppMetrics function of the EphemeralPeersMetrics interface +func (mock *MockAppMetrics) EphemeralPeersMetrics() *EphemeralPeersMetrics { + if mock.EphemeralPeersMetricsFunc != nil { + return mock.EphemeralPeersMetricsFunc() + } + return nil +} + // AppMetrics is metrics interface type AppMetrics interface { GetMeter() metric2.Meter @@ -114,6 +123,7 @@ type AppMetrics interface { StoreMetrics() *StoreMetrics UpdateChannelMetrics() *UpdateChannelMetrics AccountManagerMetrics() *AccountManagerMetrics + EphemeralPeersMetrics() *EphemeralPeersMetrics } // defaultAppMetrics are core application metrics based on OpenTelemetry https://opentelemetry.io/ @@ -129,6 +139,7 @@ type defaultAppMetrics struct { storeMetrics *StoreMetrics updateChannelMetrics *UpdateChannelMetrics accountManagerMetrics *AccountManagerMetrics + ephemeralMetrics *EphemeralPeersMetrics } // IDPMetrics returns metrics for the idp package @@ -161,6 +172,11 @@ func (appMetrics *defaultAppMetrics) AccountManagerMetrics() *AccountManagerMetr return appMetrics.accountManagerMetrics } +// EphemeralPeersMetrics returns metrics for the ephemeral peer cleanup loop +func (appMetrics *defaultAppMetrics) EphemeralPeersMetrics() *EphemeralPeersMetrics { + return appMetrics.ephemeralMetrics +} + // Close stop application metrics HTTP handler and closes listener. func (appMetrics *defaultAppMetrics) Close() error { if appMetrics.listener == nil { @@ -245,6 +261,11 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) { return nil, fmt.Errorf("failed to initialize account manager metrics: %w", err) } + ephemeralMetrics, err := NewEphemeralPeersMetrics(ctx, meter) + if err != nil { + return nil, fmt.Errorf("failed to initialize ephemeral peers metrics: %w", err) + } + return &defaultAppMetrics{ Meter: meter, ctx: ctx, @@ -254,6 +275,7 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) { storeMetrics: storeMetrics, updateChannelMetrics: updateChannelMetrics, accountManagerMetrics: accountManagerMetrics, + ephemeralMetrics: ephemeralMetrics, }, nil } @@ -290,6 +312,11 @@ func NewAppMetricsWithMeter(ctx context.Context, meter metric2.Meter) (AppMetric return nil, fmt.Errorf("failed to initialize account manager metrics: %w", err) } + ephemeralMetrics, err := NewEphemeralPeersMetrics(ctx, meter) + if err != nil { + return nil, fmt.Errorf("failed to initialize ephemeral peers metrics: %w", err) + } + return &defaultAppMetrics{ Meter: meter, ctx: ctx, @@ -300,5 +327,6 @@ func NewAppMetricsWithMeter(ctx context.Context, meter metric2.Meter) (AppMetric storeMetrics: storeMetrics, updateChannelMetrics: updateChannelMetrics, accountManagerMetrics: accountManagerMetrics, + ephemeralMetrics: ephemeralMetrics, }, nil } diff --git a/management/server/telemetry/ephemeral_metrics.go b/management/server/telemetry/ephemeral_metrics.go new file mode 100644 index 000000000..a7fb432f8 --- /dev/null +++ b/management/server/telemetry/ephemeral_metrics.go @@ -0,0 +1,115 @@ +package telemetry + +import ( + "context" + + "go.opentelemetry.io/otel/metric" +) + +// EphemeralPeersMetrics tracks the ephemeral peer cleanup pipeline: how +// many peers are currently scheduled for deletion, how many tick runs +// the cleaner has performed, how many peers it has removed, and how +// many delete batches failed. +type EphemeralPeersMetrics struct { + ctx context.Context + + pending metric.Int64UpDownCounter + cleanupRuns metric.Int64Counter + peersCleaned metric.Int64Counter + errors metric.Int64Counter +} + +// NewEphemeralPeersMetrics constructs the ephemeral cleanup counters. +func NewEphemeralPeersMetrics(ctx context.Context, meter metric.Meter) (*EphemeralPeersMetrics, error) { + pending, err := meter.Int64UpDownCounter("management.ephemeral.peers.pending", + metric.WithUnit("1"), + metric.WithDescription("Number of ephemeral peers currently waiting to be cleaned up")) + if err != nil { + return nil, err + } + + cleanupRuns, err := meter.Int64Counter("management.ephemeral.cleanup.runs.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of ephemeral cleanup ticks that processed at least one peer")) + if err != nil { + return nil, err + } + + peersCleaned, err := meter.Int64Counter("management.ephemeral.peers.cleaned.counter", + metric.WithUnit("1"), + metric.WithDescription("Total number of ephemeral peers deleted by the cleanup loop")) + if err != nil { + return nil, err + } + + errors, err := meter.Int64Counter("management.ephemeral.cleanup.errors.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of ephemeral cleanup batches (per account) that failed to delete")) + if err != nil { + return nil, err + } + + return &EphemeralPeersMetrics{ + ctx: ctx, + pending: pending, + cleanupRuns: cleanupRuns, + peersCleaned: peersCleaned, + errors: errors, + }, nil +} + +// All methods are nil-receiver safe so callers that haven't wired metrics +// (tests, self-hosted with metrics off) can invoke them unconditionally. + +// IncPending bumps the pending gauge when a peer is added to the cleanup list. +func (m *EphemeralPeersMetrics) IncPending() { + if m == nil { + return + } + m.pending.Add(m.ctx, 1) +} + +// AddPending bumps the pending gauge by n — used at startup when the +// initial set of ephemeral peers is loaded from the store. +func (m *EphemeralPeersMetrics) AddPending(n int64) { + if m == nil || n <= 0 { + return + } + m.pending.Add(m.ctx, n) +} + +// DecPending decreases the pending gauge — used both when a peer reconnects +// before its deadline (removed from the list) and when a cleanup tick +// actually deletes it. +func (m *EphemeralPeersMetrics) DecPending(n int64) { + if m == nil || n <= 0 { + return + } + m.pending.Add(m.ctx, -n) +} + +// CountCleanupRun records one cleanup pass that processed >0 peers. Idle +// ticks (nothing to do) deliberately don't increment so the rate +// reflects useful work. +func (m *EphemeralPeersMetrics) CountCleanupRun() { + if m == nil { + return + } + m.cleanupRuns.Add(m.ctx, 1) +} + +// CountPeersCleaned records the number of peers a single tick deleted. +func (m *EphemeralPeersMetrics) CountPeersCleaned(n int64) { + if m == nil || n <= 0 { + return + } + m.peersCleaned.Add(m.ctx, n) +} + +// CountCleanupError records a failed delete batch. +func (m *EphemeralPeersMetrics) CountCleanupError() { + if m == nil { + return + } + m.errors.Add(m.ctx, 1) +}