[management] Add metrics for peer status updates and ephemeral cleanup (#6196)

* [management] Add metrics for peer status updates and ephemeral cleanup

The session-fenced MarkPeerConnected / MarkPeerDisconnected path and
the ephemeral peer cleanup loop both run silently today: when fencing
rejects a stale stream, when a cleanup tick deletes peers, or when a
batch delete fails, we have no operational signal beyond log lines.

Add OpenTelemetry counters and a histogram so the same SLO-style
dashboards that already exist for the network-map controller can cover
peer connect/disconnect and ephemeral cleanup too.

All new attributes are bounded enums: operation in {connect,disconnect}
and outcome in {applied,stale,error,peer_not_found}. No account, peer,
or user ID is ever written as a metric label — total cardinality is
fixed at compile time (8 counter series, 2 histogram series, 4 unlabeled
ephemeral series).

Metric methods are nil-receiver safe so test composition that doesn't
wire telemetry (the bulk of the existing tests) works unchanged. The
ephemeral manager exposes a SetMetrics setter rather than taking the
collector through its constructor, keeping the constructor signature
stable across all test call sites.

* [management] Add OpenTelemetry metrics for ephemeral peer cleanup

Introduce counters for tracking ephemeral peer cleanup, including peers pending deletion, cleanup runs, successful deletions, and failed batches. Metrics are nil-receiver safe to ensure compatibility with test setups without telemetry.
This commit is contained in:
Maycon Santos
2026-05-18 22:55:19 +02:00
committed by GitHub
parent 13d32d274f
commit af24fd7796
6 changed files with 281 additions and 6 deletions

View File

@@ -11,6 +11,7 @@ import (
"github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral"
"github.com/netbirdio/netbird/management/server/activity"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/store"
)
@@ -47,6 +48,11 @@ type EphemeralManager struct {
lifeTime time.Duration
cleanupWindow time.Duration
// metrics is nil-safe; methods on telemetry.EphemeralPeersMetrics
// no-op when the receiver is nil so deployments without an app
// metrics provider work unchanged.
metrics *telemetry.EphemeralPeersMetrics
}
// NewEphemeralManager instantiate new EphemeralManager
@@ -60,6 +66,15 @@ func NewEphemeralManager(store store.Store, peersManager peers.Manager) *Ephemer
}
}
// SetMetrics attaches a metrics collector. Safe to call once before
// LoadInitialPeers; later attachment is fine but earlier loads won't be
// reflected in the gauge. Pass nil to detach.
func (e *EphemeralManager) SetMetrics(m *telemetry.EphemeralPeersMetrics) {
e.peersLock.Lock()
e.metrics = m
e.peersLock.Unlock()
}
// LoadInitialPeers load from the database the ephemeral type of peers and schedule a cleanup procedure to the head
// of the linked list (to the most deprecated peer). At the end of cleanup it schedules the next cleanup to the new
// head.
@@ -97,7 +112,9 @@ func (e *EphemeralManager) OnPeerConnected(ctx context.Context, peer *nbpeer.Pee
e.peersLock.Lock()
defer e.peersLock.Unlock()
e.removePeer(peer.ID)
if e.removePeer(peer.ID) {
e.metrics.DecPending(1)
}
// stop the unnecessary timer
if e.headPeer == nil && e.timer != nil {
@@ -123,6 +140,7 @@ func (e *EphemeralManager) OnPeerDisconnected(ctx context.Context, peer *nbpeer.
}
e.addPeer(peer.AccountID, peer.ID, e.newDeadLine())
e.metrics.IncPending()
if e.timer == nil {
delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow
if delay < 0 {
@@ -145,6 +163,7 @@ func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) {
for _, p := range peers {
e.addPeer(p.AccountID, p.ID, t)
}
e.metrics.AddPending(int64(len(peers)))
log.WithContext(ctx).Debugf("loaded ephemeral peer(s): %d", len(peers))
}
@@ -181,6 +200,15 @@ func (e *EphemeralManager) cleanup(ctx context.Context) {
e.peersLock.Unlock()
// Drop the gauge by the number of entries we just took off the list,
// regardless of whether the subsequent DeletePeers call succeeds. The
// list invariant is what the gauge tracks; failed delete batches are
// counted separately via CountCleanupError so we can still see them.
if len(deletePeers) > 0 {
e.metrics.CountCleanupRun()
e.metrics.DecPending(int64(len(deletePeers)))
}
peerIDsPerAccount := make(map[string][]string)
for id, p := range deletePeers {
peerIDsPerAccount[p.accountID] = append(peerIDsPerAccount[p.accountID], id)
@@ -191,7 +219,10 @@ func (e *EphemeralManager) cleanup(ctx context.Context) {
err := e.peersManager.DeletePeers(ctx, accountID, peerIDs, activity.SystemInitiator, true)
if err != nil {
log.WithContext(ctx).Errorf("failed to delete ephemeral peers: %s", err)
e.metrics.CountCleanupError()
continue
}
e.metrics.CountPeersCleaned(int64(len(peerIDs)))
}
}
@@ -211,9 +242,12 @@ func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline tim
e.tailPeer = ep
}
func (e *EphemeralManager) removePeer(id string) {
// removePeer drops the entry from the linked list. Returns true if a
// matching entry was found and removed so callers can keep the pending
// metric gauge in sync.
func (e *EphemeralManager) removePeer(id string) bool {
if e.headPeer == nil {
return
return false
}
if e.headPeer.id == id {
@@ -221,7 +255,7 @@ func (e *EphemeralManager) removePeer(id string) {
if e.tailPeer.id == id {
e.tailPeer = nil
}
return
return true
}
for p := e.headPeer; p.next != nil; p = p.next {
@@ -231,9 +265,10 @@ func (e *EphemeralManager) removePeer(id string) {
e.tailPeer = p
}
p.next = p.next.next
return
return true
}
}
return false
}
func (e *EphemeralManager) isPeerOnList(id string) bool {

View File

@@ -112,7 +112,11 @@ func (s *BaseServer) AuthManager() auth.Manager {
func (s *BaseServer) EphemeralManager() ephemeral.Manager {
return Create(s, func() ephemeral.Manager {
return manager.NewEphemeralManager(s.Store(), s.PeersManager())
em := manager.NewEphemeralManager(s.Store(), s.PeersManager())
if metrics := s.Metrics(); metrics != nil {
em.SetMetrics(metrics.EphemeralPeersMetrics())
}
return em
})
}

View File

@@ -28,6 +28,7 @@ import (
"github.com/netbirdio/netbird/management/server/activity"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/shared/management/status"
)
@@ -72,19 +73,32 @@ func (am *DefaultAccountManager) GetPeers(ctx context.Context, accountID, userID
// Disconnects use MarkPeerDisconnected and require the session to match
// exactly; see PeerStatus.SessionStartedAt for the protocol.
func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, realIP net.IP, accountID string, sessionStartedAt int64) error {
start := time.Now()
defer func() {
am.metrics.AccountManagerMetrics().RecordPeerStatusUpdateDuration(telemetry.PeerStatusConnect, time.Since(start))
}()
peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerPubKey)
if err != nil {
outcome := telemetry.PeerStatusError
if s, ok := status.FromError(err); ok && s.Type() == status.NotFound {
outcome = telemetry.PeerStatusPeerNotFound
}
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, outcome)
return err
}
updated, err := am.Store.MarkPeerConnectedIfNewerSession(ctx, accountID, peer.ID, sessionStartedAt)
if err != nil {
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, telemetry.PeerStatusError)
return err
}
if !updated {
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, telemetry.PeerStatusStale)
log.WithContext(ctx).Tracef("peer %s already has a newer session in store, skipping connect", peer.ID)
return nil
}
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, telemetry.PeerStatusApplied)
if am.geo != nil && realIP != nil {
am.updatePeerLocationIfChanged(ctx, accountID, peer, realIP)
@@ -119,19 +133,33 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
// newer stream has already taken ownership of the peer — disconnects from
// the older stream are ignored. LastSeen is written by the database.
func (am *DefaultAccountManager) MarkPeerDisconnected(ctx context.Context, peerPubKey string, accountID string, sessionStartedAt int64) error {
start := time.Now()
defer func() {
am.metrics.AccountManagerMetrics().RecordPeerStatusUpdateDuration(telemetry.PeerStatusDisconnect, time.Since(start))
}()
peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerPubKey)
if err != nil {
outcome := telemetry.PeerStatusError
if s, ok := status.FromError(err); ok && s.Type() == status.NotFound {
outcome = telemetry.PeerStatusPeerNotFound
}
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, outcome)
return err
}
updated, err := am.Store.MarkPeerDisconnectedIfSameSession(ctx, accountID, peer.ID, sessionStartedAt)
if err != nil {
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, telemetry.PeerStatusError)
return err
}
if !updated {
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, telemetry.PeerStatusStale)
log.WithContext(ctx).Tracef("peer %s session token mismatch on disconnect (token=%d), skipping",
peer.ID, sessionStartedAt)
return nil
}
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, telemetry.PeerStatusApplied)
return nil
}

View File

@@ -16,6 +16,8 @@ type AccountManagerMetrics struct {
getPeerNetworkMapDurationMs metric.Float64Histogram
networkMapObjectCount metric.Int64Histogram
peerMetaUpdateCount metric.Int64Counter
peerStatusUpdateCounter metric.Int64Counter
peerStatusUpdateDurationMs metric.Float64Histogram
}
// NewAccountManagerMetrics creates an instance of AccountManagerMetrics
@@ -64,6 +66,24 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account
return nil, err
}
// peerStatusUpdateCounter records every attempt to mark a peer as connected or disconnected
peerStatusUpdateCounter, err := meter.Int64Counter("management.account.peer.status.update.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of peer status update attempts, labeled by operation (connect|disconnect) and outcome (applied|stale|error|peer_not_found)"))
if err != nil {
return nil, err
}
peerStatusUpdateDurationMs, err := meter.Float64Histogram("management.account.peer.status.update.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithExplicitBucketBoundaries(
1, 5, 15, 25, 50, 100, 250, 500, 1000, 2000, 5000,
),
metric.WithDescription("Duration of a peer status update (fence UPDATE + post-write side effects), labeled by operation"))
if err != nil {
return nil, err
}
return &AccountManagerMetrics{
ctx: ctx,
getPeerNetworkMapDurationMs: getPeerNetworkMapDurationMs,
@@ -71,10 +91,35 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account
updateAccountPeersCounter: updateAccountPeersCounter,
networkMapObjectCount: networkMapObjectCount,
peerMetaUpdateCount: peerMetaUpdateCount,
peerStatusUpdateCounter: peerStatusUpdateCounter,
peerStatusUpdateDurationMs: peerStatusUpdateDurationMs,
}, nil
}
// PeerStatusOperation labels the kind of fence-locked peer status write.
type PeerStatusOperation string
// PeerStatusOutcome labels how a fence-locked peer status write resolved.
type PeerStatusOutcome string
const (
PeerStatusConnect PeerStatusOperation = "connect"
PeerStatusDisconnect PeerStatusOperation = "disconnect"
// PeerStatusApplied — the fence WHERE matched and the UPDATE landed.
PeerStatusApplied PeerStatusOutcome = "applied"
// PeerStatusStale — the fence WHERE rejected the write because a
// newer session has already taken ownership (connect: stored token
// >= incoming; disconnect: stored token != incoming).
PeerStatusStale PeerStatusOutcome = "stale"
// PeerStatusError — the store returned a non-NotFound error.
PeerStatusError PeerStatusOutcome = "error"
// PeerStatusPeerNotFound — the peer lookup failed (the peer was
// deleted between the gRPC sync handshake and the status write).
PeerStatusPeerNotFound PeerStatusOutcome = "peer_not_found"
)
// CountUpdateAccountPeersDuration counts the duration of updating account peers
func (metrics *AccountManagerMetrics) CountUpdateAccountPeersDuration(duration time.Duration) {
metrics.updateAccountPeersDurationMs.Record(metrics.ctx, float64(duration.Nanoseconds())/1e6)
@@ -104,3 +149,23 @@ func (metrics *AccountManagerMetrics) CountUpdateAccountPeersTriggered(resource,
func (metrics *AccountManagerMetrics) CountPeerMetUpdate() {
metrics.peerMetaUpdateCount.Add(metrics.ctx, 1)
}
// CountPeerStatusUpdate increments the connect/disconnect counter,
// labeled by operation and outcome. Both labels are bounded enums.
func (metrics *AccountManagerMetrics) CountPeerStatusUpdate(op PeerStatusOperation, outcome PeerStatusOutcome) {
metrics.peerStatusUpdateCounter.Add(metrics.ctx, 1,
metric.WithAttributes(
attribute.String("operation", string(op)),
attribute.String("outcome", string(outcome)),
),
)
}
// RecordPeerStatusUpdateDuration records the wall-clock time spent
// running a peer status update (including post-write side effects),
// labeled by operation.
func (metrics *AccountManagerMetrics) RecordPeerStatusUpdateDuration(op PeerStatusOperation, d time.Duration) {
metrics.peerStatusUpdateDurationMs.Record(metrics.ctx, float64(d.Nanoseconds())/1e6,
metric.WithAttributes(attribute.String("operation", string(op))),
)
}

View File

@@ -29,6 +29,7 @@ type MockAppMetrics struct {
StoreMetricsFunc func() *StoreMetrics
UpdateChannelMetricsFunc func() *UpdateChannelMetrics
AddAccountManagerMetricsFunc func() *AccountManagerMetrics
EphemeralPeersMetricsFunc func() *EphemeralPeersMetrics
}
// GetMeter mocks the GetMeter function of the AppMetrics interface
@@ -103,6 +104,14 @@ func (mock *MockAppMetrics) AccountManagerMetrics() *AccountManagerMetrics {
return nil
}
// EphemeralPeersMetrics mocks the MockAppMetrics function of the EphemeralPeersMetrics interface
func (mock *MockAppMetrics) EphemeralPeersMetrics() *EphemeralPeersMetrics {
if mock.EphemeralPeersMetricsFunc != nil {
return mock.EphemeralPeersMetricsFunc()
}
return nil
}
// AppMetrics is metrics interface
type AppMetrics interface {
GetMeter() metric2.Meter
@@ -114,6 +123,7 @@ type AppMetrics interface {
StoreMetrics() *StoreMetrics
UpdateChannelMetrics() *UpdateChannelMetrics
AccountManagerMetrics() *AccountManagerMetrics
EphemeralPeersMetrics() *EphemeralPeersMetrics
}
// defaultAppMetrics are core application metrics based on OpenTelemetry https://opentelemetry.io/
@@ -129,6 +139,7 @@ type defaultAppMetrics struct {
storeMetrics *StoreMetrics
updateChannelMetrics *UpdateChannelMetrics
accountManagerMetrics *AccountManagerMetrics
ephemeralMetrics *EphemeralPeersMetrics
}
// IDPMetrics returns metrics for the idp package
@@ -161,6 +172,11 @@ func (appMetrics *defaultAppMetrics) AccountManagerMetrics() *AccountManagerMetr
return appMetrics.accountManagerMetrics
}
// EphemeralPeersMetrics returns metrics for the ephemeral peer cleanup loop
func (appMetrics *defaultAppMetrics) EphemeralPeersMetrics() *EphemeralPeersMetrics {
return appMetrics.ephemeralMetrics
}
// Close stop application metrics HTTP handler and closes listener.
func (appMetrics *defaultAppMetrics) Close() error {
if appMetrics.listener == nil {
@@ -245,6 +261,11 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) {
return nil, fmt.Errorf("failed to initialize account manager metrics: %w", err)
}
ephemeralMetrics, err := NewEphemeralPeersMetrics(ctx, meter)
if err != nil {
return nil, fmt.Errorf("failed to initialize ephemeral peers metrics: %w", err)
}
return &defaultAppMetrics{
Meter: meter,
ctx: ctx,
@@ -254,6 +275,7 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) {
storeMetrics: storeMetrics,
updateChannelMetrics: updateChannelMetrics,
accountManagerMetrics: accountManagerMetrics,
ephemeralMetrics: ephemeralMetrics,
}, nil
}
@@ -290,6 +312,11 @@ func NewAppMetricsWithMeter(ctx context.Context, meter metric2.Meter) (AppMetric
return nil, fmt.Errorf("failed to initialize account manager metrics: %w", err)
}
ephemeralMetrics, err := NewEphemeralPeersMetrics(ctx, meter)
if err != nil {
return nil, fmt.Errorf("failed to initialize ephemeral peers metrics: %w", err)
}
return &defaultAppMetrics{
Meter: meter,
ctx: ctx,
@@ -300,5 +327,6 @@ func NewAppMetricsWithMeter(ctx context.Context, meter metric2.Meter) (AppMetric
storeMetrics: storeMetrics,
updateChannelMetrics: updateChannelMetrics,
accountManagerMetrics: accountManagerMetrics,
ephemeralMetrics: ephemeralMetrics,
}, nil
}

View File

@@ -0,0 +1,115 @@
package telemetry
import (
"context"
"go.opentelemetry.io/otel/metric"
)
// EphemeralPeersMetrics tracks the ephemeral peer cleanup pipeline: how
// many peers are currently scheduled for deletion, how many tick runs
// the cleaner has performed, how many peers it has removed, and how
// many delete batches failed.
type EphemeralPeersMetrics struct {
ctx context.Context
pending metric.Int64UpDownCounter
cleanupRuns metric.Int64Counter
peersCleaned metric.Int64Counter
errors metric.Int64Counter
}
// NewEphemeralPeersMetrics constructs the ephemeral cleanup counters.
func NewEphemeralPeersMetrics(ctx context.Context, meter metric.Meter) (*EphemeralPeersMetrics, error) {
pending, err := meter.Int64UpDownCounter("management.ephemeral.peers.pending",
metric.WithUnit("1"),
metric.WithDescription("Number of ephemeral peers currently waiting to be cleaned up"))
if err != nil {
return nil, err
}
cleanupRuns, err := meter.Int64Counter("management.ephemeral.cleanup.runs.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of ephemeral cleanup ticks that processed at least one peer"))
if err != nil {
return nil, err
}
peersCleaned, err := meter.Int64Counter("management.ephemeral.peers.cleaned.counter",
metric.WithUnit("1"),
metric.WithDescription("Total number of ephemeral peers deleted by the cleanup loop"))
if err != nil {
return nil, err
}
errors, err := meter.Int64Counter("management.ephemeral.cleanup.errors.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of ephemeral cleanup batches (per account) that failed to delete"))
if err != nil {
return nil, err
}
return &EphemeralPeersMetrics{
ctx: ctx,
pending: pending,
cleanupRuns: cleanupRuns,
peersCleaned: peersCleaned,
errors: errors,
}, nil
}
// All methods are nil-receiver safe so callers that haven't wired metrics
// (tests, self-hosted with metrics off) can invoke them unconditionally.
// IncPending bumps the pending gauge when a peer is added to the cleanup list.
func (m *EphemeralPeersMetrics) IncPending() {
if m == nil {
return
}
m.pending.Add(m.ctx, 1)
}
// AddPending bumps the pending gauge by n — used at startup when the
// initial set of ephemeral peers is loaded from the store.
func (m *EphemeralPeersMetrics) AddPending(n int64) {
if m == nil || n <= 0 {
return
}
m.pending.Add(m.ctx, n)
}
// DecPending decreases the pending gauge — used both when a peer reconnects
// before its deadline (removed from the list) and when a cleanup tick
// actually deletes it.
func (m *EphemeralPeersMetrics) DecPending(n int64) {
if m == nil || n <= 0 {
return
}
m.pending.Add(m.ctx, -n)
}
// CountCleanupRun records one cleanup pass that processed >0 peers. Idle
// ticks (nothing to do) deliberately don't increment so the rate
// reflects useful work.
func (m *EphemeralPeersMetrics) CountCleanupRun() {
if m == nil {
return
}
m.cleanupRuns.Add(m.ctx, 1)
}
// CountPeersCleaned records the number of peers a single tick deleted.
func (m *EphemeralPeersMetrics) CountPeersCleaned(n int64) {
if m == nil || n <= 0 {
return
}
m.peersCleaned.Add(m.ctx, n)
}
// CountCleanupError records a failed delete batch.
func (m *EphemeralPeersMetrics) CountCleanupError() {
if m == nil {
return
}
m.errors.Add(m.ctx, 1)
}