mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-23 17:19:54 +00:00
Compare commits
1 Commits
ui-refacto
...
refactor/e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5eb28acb11 |
@@ -2,6 +2,7 @@ package manager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -11,44 +12,76 @@ import (
|
||||
"github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral"
|
||||
"github.com/netbirdio/netbird/management/server/activity"
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||
|
||||
"github.com/netbirdio/netbird/management/server/store"
|
||||
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||
)
|
||||
|
||||
const (
|
||||
// cleanupWindow is the time window to wait after nearest peer deadline to start the cleanup procedure.
|
||||
// cleanupWindow is the small grace period added on top of the
|
||||
// staleness horizon before a sweep fires. It absorbs minor clock
|
||||
// skew between the management server and the database and avoids
|
||||
// firing a sweep right at the boundary where last_seen could still
|
||||
// be one tick under the threshold.
|
||||
cleanupWindow = 1 * time.Minute
|
||||
|
||||
// initialLoadMinDelay and initialLoadMaxDelay bracket the random
|
||||
// delay applied before the post-restart catch-up query runs. Spread
|
||||
// across replicas this prevents a thundering herd of catch-up
|
||||
// queries hitting the database simultaneously after a deploy.
|
||||
initialLoadMinDelay = 8 * time.Minute
|
||||
initialLoadMaxDelay = 10 * time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
timeNow = time.Now
|
||||
)
|
||||
|
||||
type ephemeralPeer struct {
|
||||
id string
|
||||
accountID string
|
||||
deadline time.Time
|
||||
next *ephemeralPeer
|
||||
// accountEntry is the per-account state held by the cleanup tracker.
|
||||
// We don't track which peers are pending — the sweep query gets the
|
||||
// authoritative list straight from the database every time. We only
|
||||
// need to know the latest disconnect we've observed for this account
|
||||
// (so we can decide when it's safe to drop the entry) and the timer
|
||||
// that will fire the next sweep.
|
||||
type accountEntry struct {
|
||||
lastDisconnectedAt time.Time
|
||||
timer *time.Timer
|
||||
}
|
||||
|
||||
// todo: consider to remove peer from ephemeral list when the peer has been deleted via API. If we do not do it
|
||||
// in worst case we will get invalid error message in this manager.
|
||||
|
||||
// EphemeralManager keep a list of ephemeral peers. After EphemeralLifeTime inactivity the peer will be deleted
|
||||
// automatically. Inactivity means the peer disconnected from the Management server.
|
||||
// EphemeralManager tracks accounts that may have ephemeral peers in
|
||||
// need of cleanup and runs a per-account sweep at the appropriate
|
||||
// time. State is in-memory and account-scoped: a sweep deletes any
|
||||
// ephemeral peer in the account that has been disconnected for at
|
||||
// least lifeTime, then either drops the account from the tracker
|
||||
// (when no recent disconnects have arrived) or re-arms the timer.
|
||||
type EphemeralManager struct {
|
||||
store store.Store
|
||||
peersManager peers.Manager
|
||||
|
||||
headPeer *ephemeralPeer
|
||||
tailPeer *ephemeralPeer
|
||||
peersLock sync.Mutex
|
||||
timer *time.Timer
|
||||
accountsLock sync.Mutex
|
||||
accounts map[string]*accountEntry
|
||||
|
||||
// initialLoadTimer is the one-shot timer used to defer the
|
||||
// post-restart catch-up query; held so Stop() can cancel it.
|
||||
initialLoadTimer *time.Timer
|
||||
// stopped is flipped by Stop() so any timer that fires after
|
||||
// teardown becomes a no-op instead of touching a half-dismantled
|
||||
// store.
|
||||
stopped bool
|
||||
|
||||
lifeTime time.Duration
|
||||
cleanupWindow time.Duration
|
||||
|
||||
// initialLoadDelay returns the wall-clock delay to wait before
|
||||
// running the post-restart catch-up query. Pluggable so tests can
|
||||
// fire the load immediately.
|
||||
initialLoadDelay func() time.Duration
|
||||
|
||||
// bgCtx is the long-lived context captured at LoadInitialPeers
|
||||
// time. Timer-driven sweeps use it because they fire long after
|
||||
// the original gRPC handler ctx that produced an OnPeerDisconnected
|
||||
// call has been cancelled.
|
||||
bgCtx context.Context
|
||||
|
||||
// metrics is nil-safe; methods on telemetry.EphemeralPeersMetrics
|
||||
// no-op when the receiver is nil so deployments without an app
|
||||
// metrics provider work unchanged.
|
||||
@@ -58,228 +91,265 @@ type EphemeralManager struct {
|
||||
// NewEphemeralManager instantiate new EphemeralManager
|
||||
func NewEphemeralManager(store store.Store, peersManager peers.Manager) *EphemeralManager {
|
||||
return &EphemeralManager{
|
||||
store: store,
|
||||
peersManager: peersManager,
|
||||
|
||||
lifeTime: ephemeral.EphemeralLifeTime,
|
||||
cleanupWindow: cleanupWindow,
|
||||
store: store,
|
||||
peersManager: peersManager,
|
||||
accounts: make(map[string]*accountEntry),
|
||||
lifeTime: ephemeral.EphemeralLifeTime,
|
||||
cleanupWindow: cleanupWindow,
|
||||
initialLoadDelay: defaultInitialLoadDelay,
|
||||
}
|
||||
}
|
||||
|
||||
// SetMetrics attaches a metrics collector. Safe to call once before
|
||||
// LoadInitialPeers; later attachment is fine but earlier loads won't be
|
||||
// reflected in the gauge. Pass nil to detach.
|
||||
// SetMetrics attaches a metrics collector. Pass nil to detach.
|
||||
func (e *EphemeralManager) SetMetrics(m *telemetry.EphemeralPeersMetrics) {
|
||||
e.peersLock.Lock()
|
||||
e.accountsLock.Lock()
|
||||
e.metrics = m
|
||||
e.peersLock.Unlock()
|
||||
e.accountsLock.Unlock()
|
||||
}
|
||||
|
||||
// LoadInitialPeers load from the database the ephemeral type of peers and schedule a cleanup procedure to the head
|
||||
// of the linked list (to the most deprecated peer). At the end of cleanup it schedules the next cleanup to the new
|
||||
// head.
|
||||
// LoadInitialPeers schedules the post-restart catch-up query for a
|
||||
// random moment 8-10 minutes from now. Returns immediately. The
|
||||
// catch-up populates the per-account tracker from the database so any
|
||||
// peers that disconnected before the restart still get cleaned up.
|
||||
//
|
||||
// The random delay is critical: without it, every management replica
|
||||
// hitting the same Postgres instance after a deploy would issue the
|
||||
// catch-up query simultaneously.
|
||||
func (e *EphemeralManager) LoadInitialPeers(ctx context.Context) {
|
||||
e.peersLock.Lock()
|
||||
defer e.peersLock.Unlock()
|
||||
|
||||
e.loadEphemeralPeers(ctx)
|
||||
if e.headPeer != nil {
|
||||
e.timer = time.AfterFunc(e.lifeTime, func() {
|
||||
e.cleanup(ctx)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Stop timer
|
||||
func (e *EphemeralManager) Stop() {
|
||||
e.peersLock.Lock()
|
||||
defer e.peersLock.Unlock()
|
||||
|
||||
if e.timer != nil {
|
||||
e.timer.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// OnPeerConnected remove the peer from the linked list of ephemeral peers. Because it has been called when the peer
|
||||
// is active the manager will not delete it while it is active.
|
||||
func (e *EphemeralManager) OnPeerConnected(ctx context.Context, peer *nbpeer.Peer) {
|
||||
if !peer.Ephemeral {
|
||||
e.accountsLock.Lock()
|
||||
defer e.accountsLock.Unlock()
|
||||
if e.stopped {
|
||||
return
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Tracef("remove peer from ephemeral list: %s", peer.ID)
|
||||
e.bgCtx = ctx
|
||||
|
||||
e.peersLock.Lock()
|
||||
defer e.peersLock.Unlock()
|
||||
|
||||
if e.removePeer(peer.ID) {
|
||||
e.metrics.DecPending(1)
|
||||
}
|
||||
|
||||
// stop the unnecessary timer
|
||||
if e.headPeer == nil && e.timer != nil {
|
||||
e.timer.Stop()
|
||||
e.timer = nil
|
||||
}
|
||||
delay := e.initialLoadDelay()
|
||||
log.WithContext(ctx).Infof("ephemeral peer initial load scheduled in %s", delay)
|
||||
e.initialLoadTimer = time.AfterFunc(delay, func() {
|
||||
e.loadInitialAccounts(e.bgCtx)
|
||||
})
|
||||
}
|
||||
|
||||
// OnPeerDisconnected add the peer to the linked list of ephemeral peers. Because of the peer
|
||||
// is inactive it will be deleted after the EphemeralLifeTime period.
|
||||
// Stop cancels the deferred initial load and any per-account timers.
|
||||
func (e *EphemeralManager) Stop() {
|
||||
e.accountsLock.Lock()
|
||||
defer e.accountsLock.Unlock()
|
||||
|
||||
e.stopped = true
|
||||
if e.initialLoadTimer != nil {
|
||||
e.initialLoadTimer.Stop()
|
||||
e.initialLoadTimer = nil
|
||||
}
|
||||
for _, entry := range e.accounts {
|
||||
if entry.timer != nil {
|
||||
entry.timer.Stop()
|
||||
}
|
||||
}
|
||||
e.accounts = make(map[string]*accountEntry)
|
||||
}
|
||||
|
||||
// OnPeerConnected is a no-op in the account-scoped design. The sweep
|
||||
// query filters out connected peers at the database level, so we don't
|
||||
// need an explicit "remove from list" signal when a peer reconnects.
|
||||
// Kept on the interface to preserve the existing call sites.
|
||||
func (e *EphemeralManager) OnPeerConnected(_ context.Context, _ *nbpeer.Peer) {
|
||||
}
|
||||
|
||||
// OnPeerDisconnected registers a disconnect for the peer's account and
|
||||
// arms a sweep if one isn't already scheduled. Non-ephemeral peers are
|
||||
// ignored.
|
||||
func (e *EphemeralManager) OnPeerDisconnected(ctx context.Context, peer *nbpeer.Peer) {
|
||||
if !peer.Ephemeral {
|
||||
return
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Tracef("add peer to ephemeral list: %s", peer.ID)
|
||||
|
||||
e.peersLock.Lock()
|
||||
defer e.peersLock.Unlock()
|
||||
|
||||
if e.isPeerOnList(peer.ID) {
|
||||
return
|
||||
}
|
||||
|
||||
e.addPeer(peer.AccountID, peer.ID, e.newDeadLine())
|
||||
e.metrics.IncPending()
|
||||
if e.timer == nil {
|
||||
delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow
|
||||
if delay < 0 {
|
||||
delay = 0
|
||||
}
|
||||
e.timer = time.AfterFunc(delay, func() {
|
||||
e.cleanup(ctx)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) {
|
||||
peers, err := e.store.GetAllEphemeralPeers(ctx, store.LockingStrengthNone)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Debugf("failed to load ephemeral peers: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
t := e.newDeadLine()
|
||||
for _, p := range peers {
|
||||
e.addPeer(p.AccountID, p.ID, t)
|
||||
}
|
||||
e.metrics.AddPending(int64(len(peers)))
|
||||
|
||||
log.WithContext(ctx).Debugf("loaded ephemeral peer(s): %d", len(peers))
|
||||
}
|
||||
|
||||
func (e *EphemeralManager) cleanup(ctx context.Context) {
|
||||
log.Tracef("on ephemeral cleanup")
|
||||
deletePeers := make(map[string]*ephemeralPeer)
|
||||
|
||||
e.peersLock.Lock()
|
||||
now := timeNow()
|
||||
for p := e.headPeer; p != nil; p = p.next {
|
||||
if now.Before(p.deadline) {
|
||||
break
|
||||
}
|
||||
|
||||
deletePeers[p.id] = p
|
||||
e.headPeer = p.next
|
||||
if p.next == nil {
|
||||
e.tailPeer = nil
|
||||
}
|
||||
e.accountsLock.Lock()
|
||||
defer e.accountsLock.Unlock()
|
||||
if e.stopped {
|
||||
return
|
||||
}
|
||||
|
||||
if e.headPeer != nil {
|
||||
delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow
|
||||
if delay < 0 {
|
||||
delay = 0
|
||||
}
|
||||
e.timer = time.AfterFunc(delay, func() {
|
||||
e.cleanup(ctx)
|
||||
entry, existed := e.accounts[peer.AccountID]
|
||||
if !existed {
|
||||
entry = &accountEntry{}
|
||||
e.accounts[peer.AccountID] = entry
|
||||
e.metrics.IncPending()
|
||||
}
|
||||
entry.lastDisconnectedAt = now
|
||||
|
||||
if entry.timer == nil {
|
||||
delay := e.lifeTime + e.cleanupWindow
|
||||
log.WithContext(ctx).Tracef("ephemeral: scheduling sweep for account %s in %s", peer.AccountID, delay)
|
||||
accountID := peer.AccountID
|
||||
entry.timer = time.AfterFunc(delay, func() {
|
||||
e.sweep(e.bgCtxOrFallback(ctx), accountID)
|
||||
})
|
||||
} else {
|
||||
e.timer = nil
|
||||
}
|
||||
}
|
||||
|
||||
// bgCtxOrFallback returns the long-lived background context captured at
|
||||
// LoadInitialPeers time, falling back to the supplied ctx when the
|
||||
// manager hasn't been started through LoadInitialPeers (e.g. in tests
|
||||
// that drive the manager directly). Must be called with the lock held
|
||||
// or before the timer is armed.
|
||||
func (e *EphemeralManager) bgCtxOrFallback(ctx context.Context) context.Context {
|
||||
if e.bgCtx != nil {
|
||||
return e.bgCtx
|
||||
}
|
||||
return ctx
|
||||
}
|
||||
|
||||
// loadInitialAccounts runs the post-restart catch-up query and seeds
|
||||
// the tracker with one entry per account that has at least one
|
||||
// disconnected ephemeral peer.
|
||||
func (e *EphemeralManager) loadInitialAccounts(ctx context.Context) {
|
||||
accounts, err := e.store.GetEphemeralAccountsLastDisconnect(ctx)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to load ephemeral accounts on startup: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
e.peersLock.Unlock()
|
||||
now := timeNow()
|
||||
added := 0
|
||||
|
||||
// Drop the gauge by the number of entries we just took off the list,
|
||||
// regardless of whether the subsequent DeletePeers call succeeds. The
|
||||
// list invariant is what the gauge tracks; failed delete batches are
|
||||
// counted separately via CountCleanupError so we can still see them.
|
||||
if len(deletePeers) > 0 {
|
||||
e.metrics.CountCleanupRun()
|
||||
e.metrics.DecPending(int64(len(deletePeers)))
|
||||
e.accountsLock.Lock()
|
||||
defer e.accountsLock.Unlock()
|
||||
if e.stopped {
|
||||
return
|
||||
}
|
||||
|
||||
peerIDsPerAccount := make(map[string][]string)
|
||||
for id, p := range deletePeers {
|
||||
peerIDsPerAccount[p.accountID] = append(peerIDsPerAccount[p.accountID], id)
|
||||
}
|
||||
|
||||
for accountID, peerIDs := range peerIDsPerAccount {
|
||||
log.WithContext(ctx).Tracef("cleanup: deleting %d ephemeral peers for account %s", len(peerIDs), accountID)
|
||||
err := e.peersManager.DeletePeers(ctx, accountID, peerIDs, activity.SystemInitiator, true)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to delete ephemeral peers: %s", err)
|
||||
e.metrics.CountCleanupError()
|
||||
for accountID, lastDisc := range accounts {
|
||||
// If we already learned about this account via an
|
||||
// OnPeerDisconnected that arrived during the random delay
|
||||
// window, prefer the live timestamp.
|
||||
if _, alreadyTracked := e.accounts[accountID]; alreadyTracked {
|
||||
continue
|
||||
}
|
||||
e.metrics.CountPeersCleaned(int64(len(peerIDs)))
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline time.Time) {
|
||||
ep := &ephemeralPeer{
|
||||
id: peerID,
|
||||
accountID: accountID,
|
||||
deadline: deadline,
|
||||
}
|
||||
entry := &accountEntry{lastDisconnectedAt: lastDisc}
|
||||
horizon := lastDisc.Add(e.lifeTime)
|
||||
|
||||
if e.headPeer == nil {
|
||||
e.headPeer = ep
|
||||
}
|
||||
if e.tailPeer != nil {
|
||||
e.tailPeer.next = ep
|
||||
}
|
||||
e.tailPeer = ep
|
||||
}
|
||||
|
||||
// removePeer drops the entry from the linked list. Returns true if a
|
||||
// matching entry was found and removed so callers can keep the pending
|
||||
// metric gauge in sync.
|
||||
func (e *EphemeralManager) removePeer(id string) bool {
|
||||
if e.headPeer == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if e.headPeer.id == id {
|
||||
e.headPeer = e.headPeer.next
|
||||
if e.tailPeer.id == id {
|
||||
e.tailPeer = nil
|
||||
var delay time.Duration
|
||||
if horizon.After(now) {
|
||||
delay = horizon.Sub(now) + e.cleanupWindow
|
||||
} else {
|
||||
// Already past the staleness window — sweep right away
|
||||
// (one cleanupWindow later, to keep startup load smooth
|
||||
// when many accounts qualify at once).
|
||||
delay = e.cleanupWindow
|
||||
}
|
||||
return true
|
||||
idForClosure := accountID
|
||||
entry.timer = time.AfterFunc(delay, func() {
|
||||
e.sweep(ctx, idForClosure)
|
||||
})
|
||||
e.accounts[accountID] = entry
|
||||
added++
|
||||
}
|
||||
|
||||
for p := e.headPeer; p.next != nil; p = p.next {
|
||||
if p.next.id == id {
|
||||
// if we remove the last element from the chain then set the last-1 as tail
|
||||
if e.tailPeer.id == id {
|
||||
e.tailPeer = p
|
||||
}
|
||||
p.next = p.next.next
|
||||
return true
|
||||
e.metrics.AddPending(int64(added))
|
||||
log.WithContext(ctx).Debugf("ephemeral: loaded %d account(s) for cleanup tracking", added)
|
||||
}
|
||||
|
||||
// sweep runs the cleanup pass for a single account. It queries the
|
||||
// database for disconnected ephemeral peers that have crossed the
|
||||
// staleness window, deletes them via peers.Manager, and then decides
|
||||
// whether to drop the account from the tracker or re-arm the timer.
|
||||
func (e *EphemeralManager) sweep(ctx context.Context, accountID string) {
|
||||
now := timeNow()
|
||||
|
||||
e.accountsLock.Lock()
|
||||
entry, ok := e.accounts[accountID]
|
||||
if !ok || e.stopped {
|
||||
e.accountsLock.Unlock()
|
||||
return
|
||||
}
|
||||
lastDisc := entry.lastDisconnectedAt
|
||||
entry.timer = nil
|
||||
e.accountsLock.Unlock()
|
||||
|
||||
threshold := now.Add(-e.lifeTime)
|
||||
stalePeerIDs, err := e.store.GetStaleEphemeralPeerIDsForAccount(ctx, accountID, threshold)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("ephemeral: failed to query stale peers for account %s: %v", accountID, err)
|
||||
e.metrics.CountCleanupError()
|
||||
e.rearm(ctx, accountID, e.cleanupWindow)
|
||||
return
|
||||
}
|
||||
|
||||
if len(stalePeerIDs) > 0 {
|
||||
log.WithContext(ctx).Tracef("ephemeral: deleting %d peer(s) for account %s", len(stalePeerIDs), accountID)
|
||||
if err := e.peersManager.DeletePeers(ctx, accountID, stalePeerIDs, activity.SystemInitiator, true); err != nil {
|
||||
log.WithContext(ctx).Errorf("ephemeral: failed to delete peers for account %s: %v", accountID, err)
|
||||
e.metrics.CountCleanupError()
|
||||
e.rearm(ctx, accountID, e.cleanupWindow)
|
||||
return
|
||||
}
|
||||
e.metrics.CountCleanupRun()
|
||||
e.metrics.CountPeersCleaned(int64(len(stalePeerIDs)))
|
||||
}
|
||||
return false
|
||||
|
||||
e.accountsLock.Lock()
|
||||
defer e.accountsLock.Unlock()
|
||||
if e.stopped {
|
||||
return
|
||||
}
|
||||
entry, ok = e.accounts[accountID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Drop rule: if every disconnect we've observed has now crossed
|
||||
// the staleness window, the sweep we just ran saw everything that
|
||||
// could possibly need cleaning. Dropping is safe — a future
|
||||
// disconnect will recreate the entry. The check uses the latest
|
||||
// lastDisc, which may have advanced (concurrently with the sweep
|
||||
// itself) due to a new OnPeerDisconnected, in which case we
|
||||
// correctly re-arm.
|
||||
horizon := entry.lastDisconnectedAt.Add(e.lifeTime)
|
||||
if !horizon.After(now) {
|
||||
delete(e.accounts, accountID)
|
||||
e.metrics.DecPending(1)
|
||||
log.WithContext(ctx).Tracef("ephemeral: dropping account %s (lastDisc=%s, horizon=%s, now=%s)",
|
||||
accountID, lastDisc, horizon, now)
|
||||
return
|
||||
}
|
||||
|
||||
delay := horizon.Sub(now) + e.cleanupWindow
|
||||
idForClosure := accountID
|
||||
entry.timer = time.AfterFunc(delay, func() {
|
||||
e.sweep(ctx, idForClosure)
|
||||
})
|
||||
}
|
||||
|
||||
func (e *EphemeralManager) isPeerOnList(id string) bool {
|
||||
for p := e.headPeer; p != nil; p = p.next {
|
||||
if p.id == id {
|
||||
return true
|
||||
}
|
||||
// rearm reschedules a sweep `delay` from now. Used after a recoverable
|
||||
// error in the sweep path so the account doesn't get stuck.
|
||||
func (e *EphemeralManager) rearm(ctx context.Context, accountID string, delay time.Duration) {
|
||||
e.accountsLock.Lock()
|
||||
defer e.accountsLock.Unlock()
|
||||
if e.stopped {
|
||||
return
|
||||
}
|
||||
return false
|
||||
entry, ok := e.accounts[accountID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
idForClosure := accountID
|
||||
entry.timer = time.AfterFunc(delay, func() {
|
||||
e.sweep(ctx, idForClosure)
|
||||
})
|
||||
}
|
||||
|
||||
func (e *EphemeralManager) newDeadLine() time.Time {
|
||||
return timeNow().Add(e.lifeTime)
|
||||
// defaultInitialLoadDelay returns a random duration in
|
||||
// [initialLoadMinDelay, initialLoadMaxDelay). Process-wide
|
||||
// math/rand is acceptable here — the delay is purely a smoothing
|
||||
// jitter, not a security primitive.
|
||||
func defaultInitialLoadDelay() time.Duration {
|
||||
span := int64(initialLoadMaxDelay - initialLoadMinDelay)
|
||||
if span <= 0 {
|
||||
return initialLoadMinDelay
|
||||
}
|
||||
return initialLoadMinDelay + time.Duration(rand.Int63n(span))
|
||||
}
|
||||
|
||||
@@ -2,299 +2,544 @@ package manager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
nbdns "github.com/netbirdio/netbird/dns"
|
||||
"github.com/netbirdio/netbird/management/internals/modules/peers"
|
||||
"github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral"
|
||||
nbAccount "github.com/netbirdio/netbird/management/server/account"
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
"github.com/netbirdio/netbird/management/server/store"
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
"github.com/netbirdio/netbird/route"
|
||||
)
|
||||
|
||||
// MockStore is a thin in-memory stand-in that implements only the two
|
||||
// methods the EphemeralManager uses. It honors the account / ephemeral
|
||||
// / connected / lastSeen attributes of each peer so the cleanup logic
|
||||
// can be exercised end-to-end without bringing up sqlite or Postgres.
|
||||
type MockStore struct {
|
||||
store.Store
|
||||
mu sync.Mutex
|
||||
account *types.Account
|
||||
}
|
||||
|
||||
func (s *MockStore) GetAllEphemeralPeers(_ context.Context, _ store.LockingStrength) ([]*nbpeer.Peer, error) {
|
||||
var peers []*nbpeer.Peer
|
||||
for _, v := range s.account.Peers {
|
||||
if v.Ephemeral {
|
||||
peers = append(peers, v)
|
||||
func (s *MockStore) GetStaleEphemeralPeerIDsForAccount(_ context.Context, accountID string, olderThan time.Time) ([]string, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.account == nil || s.account.Id != accountID {
|
||||
return nil, nil
|
||||
}
|
||||
var ids []string
|
||||
for _, p := range s.account.Peers {
|
||||
if !p.Ephemeral {
|
||||
continue
|
||||
}
|
||||
if p.Status == nil || p.Status.Connected {
|
||||
continue
|
||||
}
|
||||
if p.Status.LastSeen.Before(olderThan) {
|
||||
ids = append(ids, p.ID)
|
||||
}
|
||||
}
|
||||
return peers, nil
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
type MockAccountManager struct {
|
||||
mu sync.Mutex
|
||||
nbAccount.Manager
|
||||
store *MockStore
|
||||
deletePeerCalls int
|
||||
bufferUpdateCalls map[string]int
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func (a *MockAccountManager) DeletePeer(_ context.Context, accountID, peerID, userID string) error {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
a.deletePeerCalls++
|
||||
delete(a.store.account.Peers, peerID)
|
||||
if a.wg != nil {
|
||||
a.wg.Done()
|
||||
func (s *MockStore) GetEphemeralAccountsLastDisconnect(_ context.Context) (map[string]time.Time, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
out := map[string]time.Time{}
|
||||
if s.account == nil {
|
||||
return out, nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *MockAccountManager) GetDeletePeerCalls() int {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
return a.deletePeerCalls
|
||||
}
|
||||
|
||||
func (a *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
if a.bufferUpdateCalls == nil {
|
||||
a.bufferUpdateCalls = make(map[string]int)
|
||||
var latest time.Time
|
||||
hasAny := false
|
||||
for _, p := range s.account.Peers {
|
||||
if !p.Ephemeral || p.Status == nil || p.Status.Connected {
|
||||
continue
|
||||
}
|
||||
if !hasAny || p.Status.LastSeen.After(latest) {
|
||||
latest = p.Status.LastSeen
|
||||
hasAny = true
|
||||
}
|
||||
}
|
||||
a.bufferUpdateCalls[accountID]++
|
||||
}
|
||||
|
||||
func (a *MockAccountManager) GetBufferUpdateCalls(accountID string) int {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
if a.bufferUpdateCalls == nil {
|
||||
return 0
|
||||
if hasAny {
|
||||
out[s.account.Id] = latest
|
||||
}
|
||||
return a.bufferUpdateCalls[accountID]
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (a *MockAccountManager) GetStore() store.Store {
|
||||
return a.store
|
||||
}
|
||||
|
||||
func TestNewManager(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
timeNow = time.Now
|
||||
})
|
||||
startTime := time.Now()
|
||||
// withFakeClock pins timeNow to a settable value for the duration of t.
|
||||
// Returns a getter and a setter so subtests can advance virtual time.
|
||||
func withFakeClock(t *testing.T, start time.Time) (get func() time.Time, set func(time.Time)) {
|
||||
t.Helper()
|
||||
var mu sync.Mutex
|
||||
now := start
|
||||
timeNow = func() time.Time {
|
||||
return startTime
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return now
|
||||
}
|
||||
t.Cleanup(func() { timeNow = time.Now })
|
||||
|
||||
store := &MockStore{}
|
||||
ctrl := gomock.NewController(t)
|
||||
peersManager := peers.NewMockManager(ctrl)
|
||||
|
||||
numberOfPeers := 5
|
||||
numberOfEphemeralPeers := 3
|
||||
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
|
||||
|
||||
// Expect DeletePeers to be called for ephemeral peers
|
||||
peersManager.EXPECT().
|
||||
DeletePeers(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), true).
|
||||
DoAndReturn(func(ctx context.Context, accountID string, peerIDs []string, userID string, checkConnected bool) error {
|
||||
for _, peerID := range peerIDs {
|
||||
delete(store.account.Peers, peerID)
|
||||
}
|
||||
return nil
|
||||
}).
|
||||
AnyTimes()
|
||||
|
||||
mgr := NewEphemeralManager(store, peersManager)
|
||||
mgr.loadEphemeralPeers(context.Background())
|
||||
startTime = startTime.Add(ephemeral.EphemeralLifeTime + 1)
|
||||
mgr.cleanup(context.Background())
|
||||
|
||||
if len(store.account.Peers) != numberOfPeers {
|
||||
t.Errorf("failed to cleanup ephemeral peers, expected: %d, result: %d", numberOfPeers, len(store.account.Peers))
|
||||
}
|
||||
return func() time.Time {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
return now
|
||||
}, func(v time.Time) {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
now = v
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewManagerPeerConnected(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
timeNow = time.Now
|
||||
})
|
||||
startTime := time.Now()
|
||||
timeNow = func() time.Time {
|
||||
return startTime
|
||||
}
|
||||
|
||||
store := &MockStore{}
|
||||
ctrl := gomock.NewController(t)
|
||||
peersManager := peers.NewMockManager(ctrl)
|
||||
|
||||
numberOfPeers := 5
|
||||
numberOfEphemeralPeers := 3
|
||||
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
|
||||
|
||||
// Expect DeletePeers to be called for ephemeral peers (except the connected one)
|
||||
peersManager.EXPECT().
|
||||
DeletePeers(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), true).
|
||||
DoAndReturn(func(ctx context.Context, accountID string, peerIDs []string, userID string, checkConnected bool) error {
|
||||
for _, peerID := range peerIDs {
|
||||
delete(store.account.Peers, peerID)
|
||||
}
|
||||
return nil
|
||||
}).
|
||||
AnyTimes()
|
||||
|
||||
mgr := NewEphemeralManager(store, peersManager)
|
||||
mgr.loadEphemeralPeers(context.Background())
|
||||
mgr.OnPeerConnected(context.Background(), store.account.Peers["ephemeral_peer_0"])
|
||||
|
||||
startTime = startTime.Add(ephemeral.EphemeralLifeTime + 1)
|
||||
mgr.cleanup(context.Background())
|
||||
|
||||
expected := numberOfPeers + 1
|
||||
if len(store.account.Peers) != expected {
|
||||
t.Errorf("failed to cleanup ephemeral peers, expected: %d, result: %d", expected, len(store.account.Peers))
|
||||
}
|
||||
// newManagerForTest builds a manager with short timers and no random
|
||||
// initial-load delay so tests run instantly.
|
||||
func newManagerForTest(t *testing.T, st store.Store, peersMgr peers.Manager) *EphemeralManager {
|
||||
t.Helper()
|
||||
mgr := NewEphemeralManager(st, peersMgr)
|
||||
mgr.lifeTime = 100 * time.Millisecond
|
||||
mgr.cleanupWindow = 10 * time.Millisecond
|
||||
mgr.initialLoadDelay = func() time.Duration { return 0 }
|
||||
t.Cleanup(mgr.Stop)
|
||||
return mgr
|
||||
}
|
||||
|
||||
func TestNewManagerPeerDisconnected(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
timeNow = time.Now
|
||||
})
|
||||
startTime := time.Now()
|
||||
timeNow = func() time.Time {
|
||||
return startTime
|
||||
}
|
||||
// TestOnPeerDisconnected_RegistersAndSweeps drives the OnPeerDisconnected
|
||||
// path with a fake clock: a single ephemeral peer disconnects, we
|
||||
// advance past the staleness window, and the sweep deletes it.
|
||||
func TestOnPeerDisconnected_RegistersAndSweeps(t *testing.T) {
|
||||
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||
|
||||
store := &MockStore{}
|
||||
ctrl := gomock.NewController(t)
|
||||
peersManager := peers.NewMockManager(ctrl)
|
||||
|
||||
numberOfPeers := 5
|
||||
numberOfEphemeralPeers := 3
|
||||
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
|
||||
|
||||
// Expect DeletePeers to be called for the one disconnected peer
|
||||
peersManager.EXPECT().
|
||||
DeletePeers(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), true).
|
||||
DoAndReturn(func(ctx context.Context, accountID string, peerIDs []string, userID string, checkConnected bool) error {
|
||||
for _, peerID := range peerIDs {
|
||||
delete(store.account.Peers, peerID)
|
||||
}
|
||||
return nil
|
||||
}).
|
||||
AnyTimes()
|
||||
|
||||
mgr := NewEphemeralManager(store, peersManager)
|
||||
mgr.loadEphemeralPeers(context.Background())
|
||||
for _, v := range store.account.Peers {
|
||||
mgr.OnPeerConnected(context.Background(), v)
|
||||
|
||||
}
|
||||
mgr.OnPeerDisconnected(context.Background(), store.account.Peers["ephemeral_peer_0"])
|
||||
|
||||
startTime = startTime.Add(ephemeral.EphemeralLifeTime + 1)
|
||||
mgr.cleanup(context.Background())
|
||||
|
||||
expected := numberOfPeers + numberOfEphemeralPeers - 1
|
||||
if len(store.account.Peers) != expected {
|
||||
t.Errorf("failed to cleanup ephemeral peers, expected: %d, result: %d", expected, len(store.account.Peers))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanupSchedulingBehaviorIsBatched(t *testing.T) {
|
||||
const (
|
||||
ephemeralPeers = 10
|
||||
testLifeTime = 1 * time.Second
|
||||
testCleanupWindow = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
t.Cleanup(func() {
|
||||
timeNow = time.Now
|
||||
})
|
||||
startTime := time.Now()
|
||||
timeNow = func() time.Time {
|
||||
return startTime
|
||||
}
|
||||
|
||||
mockStore := &MockStore{}
|
||||
account := newAccountWithId(context.Background(), "account", "", "", false)
|
||||
mockStore.account = account
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(ephemeralPeers)
|
||||
mockAM := &MockAccountManager{
|
||||
store: mockStore,
|
||||
wg: wg,
|
||||
}
|
||||
getNow, setNow := withFakeClock(t, time.Now())
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
peersManager := peers.NewMockManager(ctrl)
|
||||
peersMgr := peers.NewMockManager(ctrl)
|
||||
|
||||
// Set up expectation that DeletePeers will be called once with all peer IDs
|
||||
peersManager.EXPECT().
|
||||
DeletePeers(gomock.Any(), account.Id, gomock.Any(), gomock.Any(), true).
|
||||
DoAndReturn(func(ctx context.Context, accountID string, peerIDs []string, userID string, checkConnected bool) error {
|
||||
// Simulate the actual deletion behavior
|
||||
for _, peerID := range peerIDs {
|
||||
err := mockAM.DeletePeer(ctx, accountID, peerID, userID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var deletedMu sync.Mutex
|
||||
var deleted []string
|
||||
var deleteCalls atomic.Int32
|
||||
peersMgr.EXPECT().
|
||||
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
|
||||
DoAndReturn(func(_ context.Context, accountID string, peerIDs []string, _ string, _ bool) error {
|
||||
deleteCalls.Add(1)
|
||||
mockStore.mu.Lock()
|
||||
for _, id := range peerIDs {
|
||||
delete(mockStore.account.Peers, id)
|
||||
}
|
||||
mockAM.BufferUpdateAccountPeers(ctx, accountID, types.UpdateReason{})
|
||||
mockStore.mu.Unlock()
|
||||
deletedMu.Lock()
|
||||
deleted = append(deleted, peerIDs...)
|
||||
deletedMu.Unlock()
|
||||
return nil
|
||||
}).
|
||||
Times(1)
|
||||
}).AnyTimes()
|
||||
|
||||
mgr := NewEphemeralManager(mockStore, peersManager)
|
||||
mgr.lifeTime = testLifeTime
|
||||
mgr.cleanupWindow = testCleanupWindow
|
||||
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||
|
||||
// Add peers and disconnect them at slightly different times (within cleanup window)
|
||||
for i := range ephemeralPeers {
|
||||
p := &nbpeer.Peer{ID: fmt.Sprintf("peer-%d", i), AccountID: account.Id, Ephemeral: true}
|
||||
mockStore.account.Peers[p.ID] = p
|
||||
// One ephemeral peer that disconnected "now".
|
||||
now := getNow()
|
||||
p := &nbpeer.Peer{
|
||||
ID: "p1",
|
||||
AccountID: "acc-1",
|
||||
Ephemeral: true,
|
||||
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now},
|
||||
}
|
||||
mockStore.account.Peers[p.ID] = p
|
||||
mgr.OnPeerDisconnected(context.Background(), p)
|
||||
|
||||
// Advance past lifeTime + cleanupWindow so the timer-driven sweep fires.
|
||||
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
|
||||
require.Eventually(t, func() bool { return deleteCalls.Load() >= 1 }, 2*time.Second, 5*time.Millisecond,
|
||||
"sweep should fire and delete the stale peer")
|
||||
|
||||
deletedMu.Lock()
|
||||
deletedCopy := append([]string(nil), deleted...)
|
||||
deletedMu.Unlock()
|
||||
require.Equal(t, []string{"p1"}, deletedCopy, "only the one ephemeral peer should be deleted")
|
||||
}
|
||||
|
||||
// TestOnPeerDisconnected_NonEphemeralIgnored: a non-ephemeral disconnect
|
||||
// must not register the account or arm any timer.
|
||||
func TestOnPeerDisconnected_NonEphemeralIgnored(t *testing.T) {
|
||||
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||
withFakeClock(t, time.Now())
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
peersMgr := peers.NewMockManager(ctrl)
|
||||
// No DeletePeers expectation — must not be called.
|
||||
|
||||
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||
mgr.OnPeerDisconnected(context.Background(), &nbpeer.Peer{
|
||||
ID: "p1",
|
||||
AccountID: "acc-1",
|
||||
Ephemeral: false,
|
||||
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: timeNow()},
|
||||
})
|
||||
|
||||
mgr.accountsLock.Lock()
|
||||
require.Empty(t, mgr.accounts, "non-ephemeral disconnect must not register an account")
|
||||
mgr.accountsLock.Unlock()
|
||||
}
|
||||
|
||||
// TestSweep_DropsAccountWhenIdle: after a sweep cleans the stale peers,
|
||||
// if no more disconnects have arrived the account must be dropped from
|
||||
// the in-memory tracker.
|
||||
func TestSweep_DropsAccountWhenIdle(t *testing.T) {
|
||||
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||
getNow, setNow := withFakeClock(t, time.Now())
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
peersMgr := peers.NewMockManager(ctrl)
|
||||
peersMgr.EXPECT().
|
||||
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
|
||||
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
|
||||
mockStore.mu.Lock()
|
||||
for _, id := range peerIDs {
|
||||
delete(mockStore.account.Peers, id)
|
||||
}
|
||||
mockStore.mu.Unlock()
|
||||
return nil
|
||||
}).AnyTimes()
|
||||
|
||||
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||
|
||||
now := getNow()
|
||||
p := &nbpeer.Peer{ID: "p1", AccountID: "acc-1", Ephemeral: true,
|
||||
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now}}
|
||||
mockStore.account.Peers[p.ID] = p
|
||||
mgr.OnPeerDisconnected(context.Background(), p)
|
||||
|
||||
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
mgr.accountsLock.Lock()
|
||||
defer mgr.accountsLock.Unlock()
|
||||
return len(mgr.accounts) == 0
|
||||
}, 2*time.Second, 5*time.Millisecond, "account should be dropped after sweep with no new disconnects")
|
||||
}
|
||||
|
||||
// TestSweep_ReArmsWhenNewDisconnectArrived: simulate the race where a
|
||||
// fresh disconnect arrives just before the sweep fires. The sweep must
|
||||
// observe the updated lastDisc and re-arm rather than drop.
|
||||
func TestSweep_ReArmsWhenNewDisconnectArrived(t *testing.T) {
|
||||
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||
getNow, setNow := withFakeClock(t, time.Now())
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
peersMgr := peers.NewMockManager(ctrl)
|
||||
peersMgr.EXPECT().
|
||||
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
|
||||
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
|
||||
mockStore.mu.Lock()
|
||||
for _, id := range peerIDs {
|
||||
delete(mockStore.account.Peers, id)
|
||||
}
|
||||
mockStore.mu.Unlock()
|
||||
return nil
|
||||
}).AnyTimes()
|
||||
|
||||
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||
|
||||
now := getNow()
|
||||
p1 := &nbpeer.Peer{ID: "p1", AccountID: "acc-1", Ephemeral: true,
|
||||
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now}}
|
||||
mockStore.account.Peers[p1.ID] = p1
|
||||
mgr.OnPeerDisconnected(context.Background(), p1)
|
||||
|
||||
// Advance most of the way toward the first sweep, then introduce
|
||||
// a fresh disconnect that resets lastDisc.
|
||||
setNow(now.Add(mgr.lifeTime - 10*time.Millisecond))
|
||||
p2 := &nbpeer.Peer{ID: "p2", AccountID: "acc-1", Ephemeral: true,
|
||||
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: getNow()}}
|
||||
mockStore.account.Peers[p2.ID] = p2
|
||||
mgr.OnPeerDisconnected(context.Background(), p2)
|
||||
|
||||
// Push past p1's staleness so the first sweep runs and cleans p1
|
||||
// but observes p2 already on the account entry. It must re-arm.
|
||||
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
mockStore.mu.Lock()
|
||||
defer mockStore.mu.Unlock()
|
||||
_, gone := mockStore.account.Peers["p1"]
|
||||
return !gone
|
||||
}, 2*time.Second, 5*time.Millisecond, "p1 should be cleaned at the first sweep")
|
||||
|
||||
// The account should still be tracked because p2 is younger than lifeTime
|
||||
// from the sweep's vantage point at this moment.
|
||||
mgr.accountsLock.Lock()
|
||||
_, stillTracked := mgr.accounts["acc-1"]
|
||||
mgr.accountsLock.Unlock()
|
||||
require.True(t, stillTracked, "account should remain tracked because p2's disconnect kept it active")
|
||||
|
||||
// Push past p2's staleness; second sweep cleans p2 and drops the account.
|
||||
setNow(getNow().Add(mgr.lifeTime + 5*mgr.cleanupWindow))
|
||||
require.Eventually(t, func() bool {
|
||||
mgr.accountsLock.Lock()
|
||||
defer mgr.accountsLock.Unlock()
|
||||
return len(mgr.accounts) == 0
|
||||
}, 2*time.Second, 5*time.Millisecond, "account should drop after the final sweep")
|
||||
}
|
||||
|
||||
// TestSweep_BatchesPeersPerAccount: many ephemeral peers disconnect on
|
||||
// the same account; a single sweep must delete them all in one
|
||||
// DeletePeers call.
|
||||
func TestSweep_BatchesPeersPerAccount(t *testing.T) {
|
||||
const ephemeralPeers = 8
|
||||
|
||||
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||
getNow, setNow := withFakeClock(t, time.Now())
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
peersMgr := peers.NewMockManager(ctrl)
|
||||
|
||||
deleteBatches := make(chan []string, 4)
|
||||
peersMgr.EXPECT().
|
||||
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
|
||||
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
|
||||
cp := append([]string(nil), peerIDs...)
|
||||
mockStore.mu.Lock()
|
||||
for _, id := range peerIDs {
|
||||
delete(mockStore.account.Peers, id)
|
||||
}
|
||||
mockStore.mu.Unlock()
|
||||
deleteBatches <- cp
|
||||
return nil
|
||||
}).Times(1)
|
||||
|
||||
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||
|
||||
now := getNow()
|
||||
for i := 0; i < ephemeralPeers; i++ {
|
||||
id := fmt.Sprintf("p-%d", i)
|
||||
// Stagger by a fraction of cleanupWindow so they all fall on
|
||||
// the same sweep tick.
|
||||
when := now.Add(time.Duration(i) * time.Millisecond)
|
||||
p := &nbpeer.Peer{ID: id, AccountID: "acc-1", Ephemeral: true,
|
||||
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: when}}
|
||||
mockStore.account.Peers[id] = p
|
||||
mgr.OnPeerDisconnected(context.Background(), p)
|
||||
startTime = startTime.Add(testCleanupWindow / (ephemeralPeers * 2))
|
||||
}
|
||||
|
||||
// Advance time past the lifetime to trigger cleanup
|
||||
startTime = startTime.Add(testLifeTime + testCleanupWindow)
|
||||
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
|
||||
|
||||
// Wait for all deletions to complete
|
||||
wg.Wait()
|
||||
|
||||
assert.Len(t, mockStore.account.Peers, 0, "all ephemeral peers should be cleaned up after the lifetime")
|
||||
assert.Equal(t, 1, mockAM.GetBufferUpdateCalls(account.Id), "buffer update should be called once")
|
||||
assert.Equal(t, ephemeralPeers, mockAM.GetDeletePeerCalls(), "should have deleted all peers")
|
||||
select {
|
||||
case batch := <-deleteBatches:
|
||||
require.Len(t, batch, ephemeralPeers, "all peers should be deleted in a single batch")
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("expected one batched DeletePeers call")
|
||||
}
|
||||
}
|
||||
|
||||
func seedPeers(store *MockStore, numberOfPeers int, numberOfEphemeralPeers int) {
|
||||
store.account = newAccountWithId(context.Background(), "my account", "", "", false)
|
||||
// TestLoadInitialAccounts_SeedsFromStore exercises the post-restart
|
||||
// catch-up path: pre-populate the store, point the manager at it, and
|
||||
// confirm both already-stale and not-yet-stale peers get cleaned at
|
||||
// their proper times.
|
||||
func TestLoadInitialAccounts_SeedsFromStore(t *testing.T) {
|
||||
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||
getNow, setNow := withFakeClock(t, time.Now())
|
||||
|
||||
for i := 0; i < numberOfPeers; i++ {
|
||||
peerId := fmt.Sprintf("peer_%d", i)
|
||||
p := &nbpeer.Peer{
|
||||
ID: peerId,
|
||||
Ephemeral: false,
|
||||
}
|
||||
store.account.Peers[p.ID] = p
|
||||
now := getNow()
|
||||
// p-stale: already past the staleness window when load runs.
|
||||
mockStore.account.Peers["p-stale"] = &nbpeer.Peer{
|
||||
ID: "p-stale", AccountID: "acc-1", Ephemeral: true,
|
||||
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now.Add(-time.Hour)},
|
||||
}
|
||||
// p-fresh: disconnected but not yet stale.
|
||||
mockStore.account.Peers["p-fresh"] = &nbpeer.Peer{
|
||||
ID: "p-fresh", AccountID: "acc-1", Ephemeral: true,
|
||||
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now},
|
||||
}
|
||||
|
||||
for i := 0; i < numberOfEphemeralPeers; i++ {
|
||||
peerId := fmt.Sprintf("ephemeral_peer_%d", i)
|
||||
p := &nbpeer.Peer{
|
||||
ID: peerId,
|
||||
Ephemeral: true,
|
||||
}
|
||||
store.account.Peers[p.ID] = p
|
||||
ctrl := gomock.NewController(t)
|
||||
peersMgr := peers.NewMockManager(ctrl)
|
||||
peersMgr.EXPECT().
|
||||
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
|
||||
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
|
||||
mockStore.mu.Lock()
|
||||
for _, id := range peerIDs {
|
||||
delete(mockStore.account.Peers, id)
|
||||
}
|
||||
mockStore.mu.Unlock()
|
||||
return nil
|
||||
}).AnyTimes()
|
||||
|
||||
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||
// Drive loadInitialAccounts directly with the fake-clock-aware now.
|
||||
mgr.loadInitialAccounts(context.Background())
|
||||
|
||||
// First sweep should fire shortly (cleanupWindow) for the stale peer.
|
||||
setNow(now.Add(5 * mgr.cleanupWindow))
|
||||
require.Eventually(t, func() bool {
|
||||
mockStore.mu.Lock()
|
||||
defer mockStore.mu.Unlock()
|
||||
_, gone := mockStore.account.Peers["p-stale"]
|
||||
return !gone
|
||||
}, 2*time.Second, 5*time.Millisecond, "p-stale should be deleted on the first sweep")
|
||||
|
||||
// p-fresh is not yet stale; advance past its window.
|
||||
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
|
||||
require.Eventually(t, func() bool {
|
||||
mockStore.mu.Lock()
|
||||
defer mockStore.mu.Unlock()
|
||||
_, gone := mockStore.account.Peers["p-fresh"]
|
||||
return !gone
|
||||
}, 2*time.Second, 5*time.Millisecond, "p-fresh should be deleted once it crosses the staleness window")
|
||||
}
|
||||
|
||||
// TestStop_CancelsPendingWork verifies that Stop() cancels both the
|
||||
// deferred initial load and per-account sweep timers and that
|
||||
// subsequent OnPeerDisconnected calls are ignored.
|
||||
func TestStop_CancelsPendingWork(t *testing.T) {
|
||||
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||
withFakeClock(t, time.Now())
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
peersMgr := peers.NewMockManager(ctrl)
|
||||
// DeletePeers must NOT be called after Stop.
|
||||
|
||||
mgr := NewEphemeralManager(mockStore, peersMgr)
|
||||
mgr.lifeTime = 100 * time.Millisecond
|
||||
mgr.cleanupWindow = 10 * time.Millisecond
|
||||
// Use a long delay so the initial-load timer is still pending.
|
||||
mgr.initialLoadDelay = func() time.Duration { return time.Hour }
|
||||
|
||||
mgr.LoadInitialPeers(context.Background())
|
||||
mgr.OnPeerDisconnected(context.Background(), &nbpeer.Peer{
|
||||
ID: "p1", AccountID: "acc-1", Ephemeral: true,
|
||||
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: timeNow()},
|
||||
})
|
||||
|
||||
mgr.accountsLock.Lock()
|
||||
require.NotNil(t, mgr.initialLoadTimer, "initial-load timer should be armed")
|
||||
require.Len(t, mgr.accounts, 1, "account should be tracked after disconnect")
|
||||
mgr.accountsLock.Unlock()
|
||||
|
||||
mgr.Stop()
|
||||
|
||||
mgr.accountsLock.Lock()
|
||||
require.Empty(t, mgr.accounts, "Stop should clear tracked accounts")
|
||||
require.True(t, mgr.stopped, "stopped flag must be set")
|
||||
mgr.accountsLock.Unlock()
|
||||
|
||||
// Post-stop disconnect must be ignored.
|
||||
mgr.OnPeerDisconnected(context.Background(), &nbpeer.Peer{
|
||||
ID: "p2", AccountID: "acc-1", Ephemeral: true,
|
||||
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: timeNow()},
|
||||
})
|
||||
mgr.accountsLock.Lock()
|
||||
require.Empty(t, mgr.accounts, "disconnects after Stop must be ignored")
|
||||
mgr.accountsLock.Unlock()
|
||||
}
|
||||
|
||||
// TestOnPeerConnected_IsNoop: the OnPeerConnected hook is preserved on
|
||||
// the interface but does nothing in the per-account model — the sweep
|
||||
// query filters connected peers at the DB level.
|
||||
func TestOnPeerConnected_IsNoop(t *testing.T) {
|
||||
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||
withFakeClock(t, time.Now())
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
peersMgr := peers.NewMockManager(ctrl)
|
||||
|
||||
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||
mgr.OnPeerDisconnected(context.Background(), &nbpeer.Peer{
|
||||
ID: "p1", AccountID: "acc-1", Ephemeral: true,
|
||||
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: timeNow()},
|
||||
})
|
||||
mgr.accountsLock.Lock()
|
||||
require.Len(t, mgr.accounts, 1, "disconnect should track the account")
|
||||
mgr.accountsLock.Unlock()
|
||||
|
||||
mgr.OnPeerConnected(context.Background(), &nbpeer.Peer{ID: "p1", AccountID: "acc-1", Ephemeral: true})
|
||||
mgr.accountsLock.Lock()
|
||||
require.Len(t, mgr.accounts, 1, "OnPeerConnected must be a no-op")
|
||||
mgr.accountsLock.Unlock()
|
||||
}
|
||||
|
||||
// TestSweep_StoreErrorReArms: if the stale-peer query fails, the
|
||||
// account must remain tracked and a follow-up sweep gets scheduled.
|
||||
func TestSweep_StoreErrorReArms(t *testing.T) {
|
||||
mockStore := &erroringStore{
|
||||
MockStore: MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)},
|
||||
}
|
||||
getNow, setNow := withFakeClock(t, time.Now())
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
peersMgr := peers.NewMockManager(ctrl)
|
||||
|
||||
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||
|
||||
p := &nbpeer.Peer{ID: "p1", AccountID: "acc-1", Ephemeral: true,
|
||||
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: getNow()}}
|
||||
mockStore.account.Peers[p.ID] = p
|
||||
mgr.OnPeerDisconnected(context.Background(), p)
|
||||
|
||||
mockStore.fail.Store(true)
|
||||
setNow(getNow().Add(mgr.lifeTime + 5*mgr.cleanupWindow))
|
||||
|
||||
// Wait until the failing sweep has run at least once.
|
||||
require.Eventually(t, func() bool { return mockStore.failedCalls.Load() >= 1 },
|
||||
2*time.Second, 5*time.Millisecond, "expected at least one failing sweep")
|
||||
|
||||
mgr.accountsLock.Lock()
|
||||
_, stillTracked := mgr.accounts["acc-1"]
|
||||
mgr.accountsLock.Unlock()
|
||||
require.True(t, stillTracked, "account must remain tracked after a sweep error")
|
||||
|
||||
// Recover and ensure the rearmed sweep cleans up.
|
||||
peersMgr.EXPECT().
|
||||
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
|
||||
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
|
||||
mockStore.mu.Lock()
|
||||
for _, id := range peerIDs {
|
||||
delete(mockStore.account.Peers, id)
|
||||
}
|
||||
mockStore.mu.Unlock()
|
||||
return nil
|
||||
}).AnyTimes()
|
||||
mockStore.fail.Store(false)
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
mockStore.mu.Lock()
|
||||
defer mockStore.mu.Unlock()
|
||||
_, gone := mockStore.account.Peers["p1"]
|
||||
return !gone
|
||||
}, 2*time.Second, 5*time.Millisecond, "rearmed sweep should clean up after the store recovers")
|
||||
}
|
||||
|
||||
// erroringStore is a MockStore that can be flipped into a failing mode
|
||||
// to exercise the sweep's error-rearm path.
|
||||
type erroringStore struct {
|
||||
MockStore
|
||||
fail atomic.Bool
|
||||
failedCalls atomic.Int32
|
||||
}
|
||||
|
||||
func (s *erroringStore) GetStaleEphemeralPeerIDsForAccount(ctx context.Context, accountID string, olderThan time.Time) ([]string, error) {
|
||||
if s.fail.Load() {
|
||||
s.failedCalls.Add(1)
|
||||
return nil, errors.New("synthetic store error")
|
||||
}
|
||||
return s.MockStore.GetStaleEphemeralPeerIDsForAccount(ctx, accountID, olderThan)
|
||||
}
|
||||
|
||||
// TestDefaultInitialLoadDelay confirms the jitter falls inside the
|
||||
// documented [8m, 10m) range — sanity check for the production timer.
|
||||
func TestDefaultInitialLoadDelay(t *testing.T) {
|
||||
for i := 0; i < 1000; i++ {
|
||||
d := defaultInitialLoadDelay()
|
||||
assert.GreaterOrEqual(t, d, initialLoadMinDelay)
|
||||
assert.Less(t, d, initialLoadMaxDelay)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -351,3 +596,7 @@ func newAccountWithId(ctx context.Context, accountID, userID, domain string, dis
|
||||
}
|
||||
return acc
|
||||
}
|
||||
|
||||
// silence the import "github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral"
|
||||
// (still needed indirectly for ephemeral.EphemeralLifeTime in production paths).
|
||||
var _ = ephemeral.EphemeralLifeTime
|
||||
|
||||
@@ -3463,6 +3463,49 @@ func (s *SqlStore) GetAllEphemeralPeers(ctx context.Context, lockStrength Lockin
|
||||
return allEphemeralPeers, nil
|
||||
}
|
||||
|
||||
// GetStaleEphemeralPeerIDsForAccount returns IDs of disconnected
|
||||
// ephemeral peers in the given account whose last_seen is strictly
|
||||
// older than olderThan.
|
||||
func (s *SqlStore) GetStaleEphemeralPeerIDsForAccount(ctx context.Context, accountID string, olderThan time.Time) ([]string, error) {
|
||||
var ids []string
|
||||
err := s.db.WithContext(ctx).
|
||||
Model(&nbpeer.Peer{}).
|
||||
Where("account_id = ? AND ephemeral = ? AND peer_status_connected = ? AND peer_status_last_seen < ?",
|
||||
accountID, true, false, olderThan).
|
||||
Pluck("id", &ids).Error
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to query stale ephemeral peers for account %s: %v", accountID, err)
|
||||
return nil, status.Errorf(status.Internal, "query stale ephemeral peers")
|
||||
}
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
// GetEphemeralAccountsLastDisconnect returns the latest peer_status_last_seen
|
||||
// per account across disconnected ephemeral peers. Returns one entry per
|
||||
// account that has at least one such peer.
|
||||
func (s *SqlStore) GetEphemeralAccountsLastDisconnect(ctx context.Context) (map[string]time.Time, error) {
|
||||
type row struct {
|
||||
AccountID string
|
||||
LastSeen time.Time
|
||||
}
|
||||
var rows []row
|
||||
err := s.db.WithContext(ctx).
|
||||
Model(&nbpeer.Peer{}).
|
||||
Select("account_id, MAX(peer_status_last_seen) AS last_seen").
|
||||
Where("ephemeral = ? AND peer_status_connected = ?", true, false).
|
||||
Group("account_id").
|
||||
Scan(&rows).Error
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to load ephemeral-account last disconnect map: %v", err)
|
||||
return nil, status.Errorf(status.Internal, "load ephemeral accounts")
|
||||
}
|
||||
out := make(map[string]time.Time, len(rows))
|
||||
for _, r := range rows {
|
||||
out[r.AccountID] = r.LastSeen
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// DeletePeer removes a peer from the store.
|
||||
func (s *SqlStore) DeletePeer(ctx context.Context, accountID string, peerID string) error {
|
||||
result := s.db.Delete(&nbpeer.Peer{}, accountAndIDQueryCondition, accountID, peerID)
|
||||
|
||||
@@ -165,6 +165,15 @@ type Store interface {
|
||||
GetAccountPeersWithExpiration(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*nbpeer.Peer, error)
|
||||
GetAccountPeersWithInactivity(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*nbpeer.Peer, error)
|
||||
GetAllEphemeralPeers(ctx context.Context, lockStrength LockingStrength) ([]*nbpeer.Peer, error)
|
||||
// GetStaleEphemeralPeerIDsForAccount returns the IDs of disconnected
|
||||
// ephemeral peers whose last_seen is strictly older than olderThan,
|
||||
// scoped to a single account. Used by the per-account cleanup sweep.
|
||||
GetStaleEphemeralPeerIDsForAccount(ctx context.Context, accountID string, olderThan time.Time) ([]string, error)
|
||||
// GetEphemeralAccountsLastDisconnect returns, for every account that
|
||||
// has at least one disconnected ephemeral peer, the most recent
|
||||
// last_seen across that account's disconnected ephemeral peers. Used
|
||||
// to reconstruct the per-account cleanup tracker after a restart.
|
||||
GetEphemeralAccountsLastDisconnect(ctx context.Context) (map[string]time.Time, error)
|
||||
SavePeer(ctx context.Context, accountID string, peer *nbpeer.Peer) error
|
||||
SavePeerStatus(ctx context.Context, accountID, peerID string, status nbpeer.PeerStatus) error
|
||||
// MarkPeerConnectedIfNewerSession sets the peer to connected with the
|
||||
|
||||
@@ -1376,6 +1376,36 @@ func (mr *MockStoreMockRecorder) GetAllEphemeralPeers(ctx, lockStrength interfac
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllEphemeralPeers", reflect.TypeOf((*MockStore)(nil).GetAllEphemeralPeers), ctx, lockStrength)
|
||||
}
|
||||
|
||||
// GetStaleEphemeralPeerIDsForAccount mocks base method.
|
||||
func (m *MockStore) GetStaleEphemeralPeerIDsForAccount(ctx context.Context, accountID string, olderThan time.Time) ([]string, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetStaleEphemeralPeerIDsForAccount", ctx, accountID, olderThan)
|
||||
ret0, _ := ret[0].([]string)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GetStaleEphemeralPeerIDsForAccount indicates an expected call of GetStaleEphemeralPeerIDsForAccount.
|
||||
func (mr *MockStoreMockRecorder) GetStaleEphemeralPeerIDsForAccount(ctx, accountID, olderThan interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStaleEphemeralPeerIDsForAccount", reflect.TypeOf((*MockStore)(nil).GetStaleEphemeralPeerIDsForAccount), ctx, accountID, olderThan)
|
||||
}
|
||||
|
||||
// GetEphemeralAccountsLastDisconnect mocks base method.
|
||||
func (m *MockStore) GetEphemeralAccountsLastDisconnect(ctx context.Context) (map[string]time.Time, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "GetEphemeralAccountsLastDisconnect", ctx)
|
||||
ret0, _ := ret[0].(map[string]time.Time)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// GetEphemeralAccountsLastDisconnect indicates an expected call of GetEphemeralAccountsLastDisconnect.
|
||||
func (mr *MockStoreMockRecorder) GetEphemeralAccountsLastDisconnect(ctx interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEphemeralAccountsLastDisconnect", reflect.TypeOf((*MockStore)(nil).GetEphemeralAccountsLastDisconnect), ctx)
|
||||
}
|
||||
|
||||
// GetAllProxyAccessTokens mocks base method.
|
||||
func (m *MockStore) GetAllProxyAccessTokens(ctx context.Context, lockStrength LockingStrength) ([]*types2.ProxyAccessToken, error) {
|
||||
m.ctrl.T.Helper()
|
||||
|
||||
@@ -7,9 +7,9 @@ import (
|
||||
)
|
||||
|
||||
// EphemeralPeersMetrics tracks the ephemeral peer cleanup pipeline: how
|
||||
// many peers are currently scheduled for deletion, how many tick runs
|
||||
// the cleaner has performed, how many peers it has removed, and how
|
||||
// many delete batches failed.
|
||||
// many accounts are currently being tracked for cleanup, how many sweep
|
||||
// runs deleted at least one peer, how many peers have been removed, and
|
||||
// how many delete batches failed.
|
||||
type EphemeralPeersMetrics struct {
|
||||
ctx context.Context
|
||||
|
||||
@@ -21,16 +21,16 @@ type EphemeralPeersMetrics struct {
|
||||
|
||||
// NewEphemeralPeersMetrics constructs the ephemeral cleanup counters.
|
||||
func NewEphemeralPeersMetrics(ctx context.Context, meter metric.Meter) (*EphemeralPeersMetrics, error) {
|
||||
pending, err := meter.Int64UpDownCounter("management.ephemeral.peers.pending",
|
||||
pending, err := meter.Int64UpDownCounter("management.ephemeral.accounts.tracked",
|
||||
metric.WithUnit("1"),
|
||||
metric.WithDescription("Number of ephemeral peers currently waiting to be cleaned up"))
|
||||
metric.WithDescription("Number of accounts currently tracked for ephemeral peer cleanup"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cleanupRuns, err := meter.Int64Counter("management.ephemeral.cleanup.runs.counter",
|
||||
metric.WithUnit("1"),
|
||||
metric.WithDescription("Number of ephemeral cleanup ticks that processed at least one peer"))
|
||||
metric.WithDescription("Number of ephemeral cleanup sweeps that deleted at least one peer"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -61,7 +61,8 @@ func NewEphemeralPeersMetrics(ctx context.Context, meter metric.Meter) (*Ephemer
|
||||
// All methods are nil-receiver safe so callers that haven't wired metrics
|
||||
// (tests, self-hosted with metrics off) can invoke them unconditionally.
|
||||
|
||||
// IncPending bumps the pending gauge when a peer is added to the cleanup list.
|
||||
// IncPending bumps the tracked-accounts gauge when a new account
|
||||
// becomes eligible for ephemeral cleanup tracking.
|
||||
func (m *EphemeralPeersMetrics) IncPending() {
|
||||
if m == nil {
|
||||
return
|
||||
@@ -69,8 +70,8 @@ func (m *EphemeralPeersMetrics) IncPending() {
|
||||
m.pending.Add(m.ctx, 1)
|
||||
}
|
||||
|
||||
// AddPending bumps the pending gauge by n — used at startup when the
|
||||
// initial set of ephemeral peers is loaded from the store.
|
||||
// AddPending bumps the tracked-accounts gauge by n — used at startup
|
||||
// when the catch-up query seeds the tracker.
|
||||
func (m *EphemeralPeersMetrics) AddPending(n int64) {
|
||||
if m == nil || n <= 0 {
|
||||
return
|
||||
@@ -78,9 +79,8 @@ func (m *EphemeralPeersMetrics) AddPending(n int64) {
|
||||
m.pending.Add(m.ctx, n)
|
||||
}
|
||||
|
||||
// DecPending decreases the pending gauge — used both when a peer reconnects
|
||||
// before its deadline (removed from the list) and when a cleanup tick
|
||||
// actually deletes it.
|
||||
// DecPending decreases the tracked-accounts gauge when an account is
|
||||
// dropped from the tracker (no more disconnects to chase).
|
||||
func (m *EphemeralPeersMetrics) DecPending(n int64) {
|
||||
if m == nil || n <= 0 {
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user