mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-30 12:39:54 +00:00
Compare commits
1 Commits
nmap/compo
...
refactor/e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5eb28acb11 |
@@ -2,6 +2,7 @@ package manager
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -11,44 +12,76 @@ import (
|
|||||||
"github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral"
|
"github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral"
|
||||||
"github.com/netbirdio/netbird/management/server/activity"
|
"github.com/netbirdio/netbird/management/server/activity"
|
||||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||||
"github.com/netbirdio/netbird/management/server/telemetry"
|
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/management/server/store"
|
"github.com/netbirdio/netbird/management/server/store"
|
||||||
|
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// cleanupWindow is the time window to wait after nearest peer deadline to start the cleanup procedure.
|
// cleanupWindow is the small grace period added on top of the
|
||||||
|
// staleness horizon before a sweep fires. It absorbs minor clock
|
||||||
|
// skew between the management server and the database and avoids
|
||||||
|
// firing a sweep right at the boundary where last_seen could still
|
||||||
|
// be one tick under the threshold.
|
||||||
cleanupWindow = 1 * time.Minute
|
cleanupWindow = 1 * time.Minute
|
||||||
|
|
||||||
|
// initialLoadMinDelay and initialLoadMaxDelay bracket the random
|
||||||
|
// delay applied before the post-restart catch-up query runs. Spread
|
||||||
|
// across replicas this prevents a thundering herd of catch-up
|
||||||
|
// queries hitting the database simultaneously after a deploy.
|
||||||
|
initialLoadMinDelay = 8 * time.Minute
|
||||||
|
initialLoadMaxDelay = 10 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
timeNow = time.Now
|
timeNow = time.Now
|
||||||
)
|
)
|
||||||
|
|
||||||
type ephemeralPeer struct {
|
// accountEntry is the per-account state held by the cleanup tracker.
|
||||||
id string
|
// We don't track which peers are pending — the sweep query gets the
|
||||||
accountID string
|
// authoritative list straight from the database every time. We only
|
||||||
deadline time.Time
|
// need to know the latest disconnect we've observed for this account
|
||||||
next *ephemeralPeer
|
// (so we can decide when it's safe to drop the entry) and the timer
|
||||||
|
// that will fire the next sweep.
|
||||||
|
type accountEntry struct {
|
||||||
|
lastDisconnectedAt time.Time
|
||||||
|
timer *time.Timer
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: consider to remove peer from ephemeral list when the peer has been deleted via API. If we do not do it
|
// EphemeralManager tracks accounts that may have ephemeral peers in
|
||||||
// in worst case we will get invalid error message in this manager.
|
// need of cleanup and runs a per-account sweep at the appropriate
|
||||||
|
// time. State is in-memory and account-scoped: a sweep deletes any
|
||||||
// EphemeralManager keep a list of ephemeral peers. After EphemeralLifeTime inactivity the peer will be deleted
|
// ephemeral peer in the account that has been disconnected for at
|
||||||
// automatically. Inactivity means the peer disconnected from the Management server.
|
// least lifeTime, then either drops the account from the tracker
|
||||||
|
// (when no recent disconnects have arrived) or re-arms the timer.
|
||||||
type EphemeralManager struct {
|
type EphemeralManager struct {
|
||||||
store store.Store
|
store store.Store
|
||||||
peersManager peers.Manager
|
peersManager peers.Manager
|
||||||
|
|
||||||
headPeer *ephemeralPeer
|
accountsLock sync.Mutex
|
||||||
tailPeer *ephemeralPeer
|
accounts map[string]*accountEntry
|
||||||
peersLock sync.Mutex
|
|
||||||
timer *time.Timer
|
// initialLoadTimer is the one-shot timer used to defer the
|
||||||
|
// post-restart catch-up query; held so Stop() can cancel it.
|
||||||
|
initialLoadTimer *time.Timer
|
||||||
|
// stopped is flipped by Stop() so any timer that fires after
|
||||||
|
// teardown becomes a no-op instead of touching a half-dismantled
|
||||||
|
// store.
|
||||||
|
stopped bool
|
||||||
|
|
||||||
lifeTime time.Duration
|
lifeTime time.Duration
|
||||||
cleanupWindow time.Duration
|
cleanupWindow time.Duration
|
||||||
|
|
||||||
|
// initialLoadDelay returns the wall-clock delay to wait before
|
||||||
|
// running the post-restart catch-up query. Pluggable so tests can
|
||||||
|
// fire the load immediately.
|
||||||
|
initialLoadDelay func() time.Duration
|
||||||
|
|
||||||
|
// bgCtx is the long-lived context captured at LoadInitialPeers
|
||||||
|
// time. Timer-driven sweeps use it because they fire long after
|
||||||
|
// the original gRPC handler ctx that produced an OnPeerDisconnected
|
||||||
|
// call has been cancelled.
|
||||||
|
bgCtx context.Context
|
||||||
|
|
||||||
// metrics is nil-safe; methods on telemetry.EphemeralPeersMetrics
|
// metrics is nil-safe; methods on telemetry.EphemeralPeersMetrics
|
||||||
// no-op when the receiver is nil so deployments without an app
|
// no-op when the receiver is nil so deployments without an app
|
||||||
// metrics provider work unchanged.
|
// metrics provider work unchanged.
|
||||||
@@ -58,228 +91,265 @@ type EphemeralManager struct {
|
|||||||
// NewEphemeralManager instantiate new EphemeralManager
|
// NewEphemeralManager instantiate new EphemeralManager
|
||||||
func NewEphemeralManager(store store.Store, peersManager peers.Manager) *EphemeralManager {
|
func NewEphemeralManager(store store.Store, peersManager peers.Manager) *EphemeralManager {
|
||||||
return &EphemeralManager{
|
return &EphemeralManager{
|
||||||
store: store,
|
store: store,
|
||||||
peersManager: peersManager,
|
peersManager: peersManager,
|
||||||
|
accounts: make(map[string]*accountEntry),
|
||||||
lifeTime: ephemeral.EphemeralLifeTime,
|
lifeTime: ephemeral.EphemeralLifeTime,
|
||||||
cleanupWindow: cleanupWindow,
|
cleanupWindow: cleanupWindow,
|
||||||
|
initialLoadDelay: defaultInitialLoadDelay,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetMetrics attaches a metrics collector. Safe to call once before
|
// SetMetrics attaches a metrics collector. Pass nil to detach.
|
||||||
// LoadInitialPeers; later attachment is fine but earlier loads won't be
|
|
||||||
// reflected in the gauge. Pass nil to detach.
|
|
||||||
func (e *EphemeralManager) SetMetrics(m *telemetry.EphemeralPeersMetrics) {
|
func (e *EphemeralManager) SetMetrics(m *telemetry.EphemeralPeersMetrics) {
|
||||||
e.peersLock.Lock()
|
e.accountsLock.Lock()
|
||||||
e.metrics = m
|
e.metrics = m
|
||||||
e.peersLock.Unlock()
|
e.accountsLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadInitialPeers load from the database the ephemeral type of peers and schedule a cleanup procedure to the head
|
// LoadInitialPeers schedules the post-restart catch-up query for a
|
||||||
// of the linked list (to the most deprecated peer). At the end of cleanup it schedules the next cleanup to the new
|
// random moment 8-10 minutes from now. Returns immediately. The
|
||||||
// head.
|
// catch-up populates the per-account tracker from the database so any
|
||||||
|
// peers that disconnected before the restart still get cleaned up.
|
||||||
|
//
|
||||||
|
// The random delay is critical: without it, every management replica
|
||||||
|
// hitting the same Postgres instance after a deploy would issue the
|
||||||
|
// catch-up query simultaneously.
|
||||||
func (e *EphemeralManager) LoadInitialPeers(ctx context.Context) {
|
func (e *EphemeralManager) LoadInitialPeers(ctx context.Context) {
|
||||||
e.peersLock.Lock()
|
e.accountsLock.Lock()
|
||||||
defer e.peersLock.Unlock()
|
defer e.accountsLock.Unlock()
|
||||||
|
if e.stopped {
|
||||||
e.loadEphemeralPeers(ctx)
|
|
||||||
if e.headPeer != nil {
|
|
||||||
e.timer = time.AfterFunc(e.lifeTime, func() {
|
|
||||||
e.cleanup(ctx)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop timer
|
|
||||||
func (e *EphemeralManager) Stop() {
|
|
||||||
e.peersLock.Lock()
|
|
||||||
defer e.peersLock.Unlock()
|
|
||||||
|
|
||||||
if e.timer != nil {
|
|
||||||
e.timer.Stop()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OnPeerConnected remove the peer from the linked list of ephemeral peers. Because it has been called when the peer
|
|
||||||
// is active the manager will not delete it while it is active.
|
|
||||||
func (e *EphemeralManager) OnPeerConnected(ctx context.Context, peer *nbpeer.Peer) {
|
|
||||||
if !peer.Ephemeral {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithContext(ctx).Tracef("remove peer from ephemeral list: %s", peer.ID)
|
e.bgCtx = ctx
|
||||||
|
|
||||||
e.peersLock.Lock()
|
delay := e.initialLoadDelay()
|
||||||
defer e.peersLock.Unlock()
|
log.WithContext(ctx).Infof("ephemeral peer initial load scheduled in %s", delay)
|
||||||
|
e.initialLoadTimer = time.AfterFunc(delay, func() {
|
||||||
if e.removePeer(peer.ID) {
|
e.loadInitialAccounts(e.bgCtx)
|
||||||
e.metrics.DecPending(1)
|
})
|
||||||
}
|
|
||||||
|
|
||||||
// stop the unnecessary timer
|
|
||||||
if e.headPeer == nil && e.timer != nil {
|
|
||||||
e.timer.Stop()
|
|
||||||
e.timer = nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnPeerDisconnected add the peer to the linked list of ephemeral peers. Because of the peer
|
// Stop cancels the deferred initial load and any per-account timers.
|
||||||
// is inactive it will be deleted after the EphemeralLifeTime period.
|
func (e *EphemeralManager) Stop() {
|
||||||
|
e.accountsLock.Lock()
|
||||||
|
defer e.accountsLock.Unlock()
|
||||||
|
|
||||||
|
e.stopped = true
|
||||||
|
if e.initialLoadTimer != nil {
|
||||||
|
e.initialLoadTimer.Stop()
|
||||||
|
e.initialLoadTimer = nil
|
||||||
|
}
|
||||||
|
for _, entry := range e.accounts {
|
||||||
|
if entry.timer != nil {
|
||||||
|
entry.timer.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
e.accounts = make(map[string]*accountEntry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnPeerConnected is a no-op in the account-scoped design. The sweep
|
||||||
|
// query filters out connected peers at the database level, so we don't
|
||||||
|
// need an explicit "remove from list" signal when a peer reconnects.
|
||||||
|
// Kept on the interface to preserve the existing call sites.
|
||||||
|
func (e *EphemeralManager) OnPeerConnected(_ context.Context, _ *nbpeer.Peer) {
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnPeerDisconnected registers a disconnect for the peer's account and
|
||||||
|
// arms a sweep if one isn't already scheduled. Non-ephemeral peers are
|
||||||
|
// ignored.
|
||||||
func (e *EphemeralManager) OnPeerDisconnected(ctx context.Context, peer *nbpeer.Peer) {
|
func (e *EphemeralManager) OnPeerDisconnected(ctx context.Context, peer *nbpeer.Peer) {
|
||||||
if !peer.Ephemeral {
|
if !peer.Ephemeral {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithContext(ctx).Tracef("add peer to ephemeral list: %s", peer.ID)
|
|
||||||
|
|
||||||
e.peersLock.Lock()
|
|
||||||
defer e.peersLock.Unlock()
|
|
||||||
|
|
||||||
if e.isPeerOnList(peer.ID) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
e.addPeer(peer.AccountID, peer.ID, e.newDeadLine())
|
|
||||||
e.metrics.IncPending()
|
|
||||||
if e.timer == nil {
|
|
||||||
delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow
|
|
||||||
if delay < 0 {
|
|
||||||
delay = 0
|
|
||||||
}
|
|
||||||
e.timer = time.AfterFunc(delay, func() {
|
|
||||||
e.cleanup(ctx)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) {
|
|
||||||
peers, err := e.store.GetAllEphemeralPeers(ctx, store.LockingStrengthNone)
|
|
||||||
if err != nil {
|
|
||||||
log.WithContext(ctx).Debugf("failed to load ephemeral peers: %s", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
t := e.newDeadLine()
|
|
||||||
for _, p := range peers {
|
|
||||||
e.addPeer(p.AccountID, p.ID, t)
|
|
||||||
}
|
|
||||||
e.metrics.AddPending(int64(len(peers)))
|
|
||||||
|
|
||||||
log.WithContext(ctx).Debugf("loaded ephemeral peer(s): %d", len(peers))
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *EphemeralManager) cleanup(ctx context.Context) {
|
|
||||||
log.Tracef("on ephemeral cleanup")
|
|
||||||
deletePeers := make(map[string]*ephemeralPeer)
|
|
||||||
|
|
||||||
e.peersLock.Lock()
|
|
||||||
now := timeNow()
|
now := timeNow()
|
||||||
for p := e.headPeer; p != nil; p = p.next {
|
|
||||||
if now.Before(p.deadline) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
deletePeers[p.id] = p
|
e.accountsLock.Lock()
|
||||||
e.headPeer = p.next
|
defer e.accountsLock.Unlock()
|
||||||
if p.next == nil {
|
if e.stopped {
|
||||||
e.tailPeer = nil
|
return
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if e.headPeer != nil {
|
entry, existed := e.accounts[peer.AccountID]
|
||||||
delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow
|
if !existed {
|
||||||
if delay < 0 {
|
entry = &accountEntry{}
|
||||||
delay = 0
|
e.accounts[peer.AccountID] = entry
|
||||||
}
|
e.metrics.IncPending()
|
||||||
e.timer = time.AfterFunc(delay, func() {
|
}
|
||||||
e.cleanup(ctx)
|
entry.lastDisconnectedAt = now
|
||||||
|
|
||||||
|
if entry.timer == nil {
|
||||||
|
delay := e.lifeTime + e.cleanupWindow
|
||||||
|
log.WithContext(ctx).Tracef("ephemeral: scheduling sweep for account %s in %s", peer.AccountID, delay)
|
||||||
|
accountID := peer.AccountID
|
||||||
|
entry.timer = time.AfterFunc(delay, func() {
|
||||||
|
e.sweep(e.bgCtxOrFallback(ctx), accountID)
|
||||||
})
|
})
|
||||||
} else {
|
}
|
||||||
e.timer = nil
|
}
|
||||||
|
|
||||||
|
// bgCtxOrFallback returns the long-lived background context captured at
|
||||||
|
// LoadInitialPeers time, falling back to the supplied ctx when the
|
||||||
|
// manager hasn't been started through LoadInitialPeers (e.g. in tests
|
||||||
|
// that drive the manager directly). Must be called with the lock held
|
||||||
|
// or before the timer is armed.
|
||||||
|
func (e *EphemeralManager) bgCtxOrFallback(ctx context.Context) context.Context {
|
||||||
|
if e.bgCtx != nil {
|
||||||
|
return e.bgCtx
|
||||||
|
}
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadInitialAccounts runs the post-restart catch-up query and seeds
|
||||||
|
// the tracker with one entry per account that has at least one
|
||||||
|
// disconnected ephemeral peer.
|
||||||
|
func (e *EphemeralManager) loadInitialAccounts(ctx context.Context) {
|
||||||
|
accounts, err := e.store.GetEphemeralAccountsLastDisconnect(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.WithContext(ctx).Errorf("failed to load ephemeral accounts on startup: %v", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
e.peersLock.Unlock()
|
now := timeNow()
|
||||||
|
added := 0
|
||||||
|
|
||||||
// Drop the gauge by the number of entries we just took off the list,
|
e.accountsLock.Lock()
|
||||||
// regardless of whether the subsequent DeletePeers call succeeds. The
|
defer e.accountsLock.Unlock()
|
||||||
// list invariant is what the gauge tracks; failed delete batches are
|
if e.stopped {
|
||||||
// counted separately via CountCleanupError so we can still see them.
|
return
|
||||||
if len(deletePeers) > 0 {
|
|
||||||
e.metrics.CountCleanupRun()
|
|
||||||
e.metrics.DecPending(int64(len(deletePeers)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
peerIDsPerAccount := make(map[string][]string)
|
for accountID, lastDisc := range accounts {
|
||||||
for id, p := range deletePeers {
|
// If we already learned about this account via an
|
||||||
peerIDsPerAccount[p.accountID] = append(peerIDsPerAccount[p.accountID], id)
|
// OnPeerDisconnected that arrived during the random delay
|
||||||
}
|
// window, prefer the live timestamp.
|
||||||
|
if _, alreadyTracked := e.accounts[accountID]; alreadyTracked {
|
||||||
for accountID, peerIDs := range peerIDsPerAccount {
|
|
||||||
log.WithContext(ctx).Tracef("cleanup: deleting %d ephemeral peers for account %s", len(peerIDs), accountID)
|
|
||||||
err := e.peersManager.DeletePeers(ctx, accountID, peerIDs, activity.SystemInitiator, true)
|
|
||||||
if err != nil {
|
|
||||||
log.WithContext(ctx).Errorf("failed to delete ephemeral peers: %s", err)
|
|
||||||
e.metrics.CountCleanupError()
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
e.metrics.CountPeersCleaned(int64(len(peerIDs)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline time.Time) {
|
entry := &accountEntry{lastDisconnectedAt: lastDisc}
|
||||||
ep := &ephemeralPeer{
|
horizon := lastDisc.Add(e.lifeTime)
|
||||||
id: peerID,
|
|
||||||
accountID: accountID,
|
|
||||||
deadline: deadline,
|
|
||||||
}
|
|
||||||
|
|
||||||
if e.headPeer == nil {
|
var delay time.Duration
|
||||||
e.headPeer = ep
|
if horizon.After(now) {
|
||||||
}
|
delay = horizon.Sub(now) + e.cleanupWindow
|
||||||
if e.tailPeer != nil {
|
} else {
|
||||||
e.tailPeer.next = ep
|
// Already past the staleness window — sweep right away
|
||||||
}
|
// (one cleanupWindow later, to keep startup load smooth
|
||||||
e.tailPeer = ep
|
// when many accounts qualify at once).
|
||||||
}
|
delay = e.cleanupWindow
|
||||||
|
|
||||||
// removePeer drops the entry from the linked list. Returns true if a
|
|
||||||
// matching entry was found and removed so callers can keep the pending
|
|
||||||
// metric gauge in sync.
|
|
||||||
func (e *EphemeralManager) removePeer(id string) bool {
|
|
||||||
if e.headPeer == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if e.headPeer.id == id {
|
|
||||||
e.headPeer = e.headPeer.next
|
|
||||||
if e.tailPeer.id == id {
|
|
||||||
e.tailPeer = nil
|
|
||||||
}
|
}
|
||||||
return true
|
idForClosure := accountID
|
||||||
|
entry.timer = time.AfterFunc(delay, func() {
|
||||||
|
e.sweep(ctx, idForClosure)
|
||||||
|
})
|
||||||
|
e.accounts[accountID] = entry
|
||||||
|
added++
|
||||||
}
|
}
|
||||||
|
|
||||||
for p := e.headPeer; p.next != nil; p = p.next {
|
e.metrics.AddPending(int64(added))
|
||||||
if p.next.id == id {
|
log.WithContext(ctx).Debugf("ephemeral: loaded %d account(s) for cleanup tracking", added)
|
||||||
// if we remove the last element from the chain then set the last-1 as tail
|
}
|
||||||
if e.tailPeer.id == id {
|
|
||||||
e.tailPeer = p
|
// sweep runs the cleanup pass for a single account. It queries the
|
||||||
}
|
// database for disconnected ephemeral peers that have crossed the
|
||||||
p.next = p.next.next
|
// staleness window, deletes them via peers.Manager, and then decides
|
||||||
return true
|
// whether to drop the account from the tracker or re-arm the timer.
|
||||||
|
func (e *EphemeralManager) sweep(ctx context.Context, accountID string) {
|
||||||
|
now := timeNow()
|
||||||
|
|
||||||
|
e.accountsLock.Lock()
|
||||||
|
entry, ok := e.accounts[accountID]
|
||||||
|
if !ok || e.stopped {
|
||||||
|
e.accountsLock.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
lastDisc := entry.lastDisconnectedAt
|
||||||
|
entry.timer = nil
|
||||||
|
e.accountsLock.Unlock()
|
||||||
|
|
||||||
|
threshold := now.Add(-e.lifeTime)
|
||||||
|
stalePeerIDs, err := e.store.GetStaleEphemeralPeerIDsForAccount(ctx, accountID, threshold)
|
||||||
|
if err != nil {
|
||||||
|
log.WithContext(ctx).Errorf("ephemeral: failed to query stale peers for account %s: %v", accountID, err)
|
||||||
|
e.metrics.CountCleanupError()
|
||||||
|
e.rearm(ctx, accountID, e.cleanupWindow)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(stalePeerIDs) > 0 {
|
||||||
|
log.WithContext(ctx).Tracef("ephemeral: deleting %d peer(s) for account %s", len(stalePeerIDs), accountID)
|
||||||
|
if err := e.peersManager.DeletePeers(ctx, accountID, stalePeerIDs, activity.SystemInitiator, true); err != nil {
|
||||||
|
log.WithContext(ctx).Errorf("ephemeral: failed to delete peers for account %s: %v", accountID, err)
|
||||||
|
e.metrics.CountCleanupError()
|
||||||
|
e.rearm(ctx, accountID, e.cleanupWindow)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
e.metrics.CountCleanupRun()
|
||||||
|
e.metrics.CountPeersCleaned(int64(len(stalePeerIDs)))
|
||||||
}
|
}
|
||||||
return false
|
|
||||||
|
e.accountsLock.Lock()
|
||||||
|
defer e.accountsLock.Unlock()
|
||||||
|
if e.stopped {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
entry, ok = e.accounts[accountID]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Drop rule: if every disconnect we've observed has now crossed
|
||||||
|
// the staleness window, the sweep we just ran saw everything that
|
||||||
|
// could possibly need cleaning. Dropping is safe — a future
|
||||||
|
// disconnect will recreate the entry. The check uses the latest
|
||||||
|
// lastDisc, which may have advanced (concurrently with the sweep
|
||||||
|
// itself) due to a new OnPeerDisconnected, in which case we
|
||||||
|
// correctly re-arm.
|
||||||
|
horizon := entry.lastDisconnectedAt.Add(e.lifeTime)
|
||||||
|
if !horizon.After(now) {
|
||||||
|
delete(e.accounts, accountID)
|
||||||
|
e.metrics.DecPending(1)
|
||||||
|
log.WithContext(ctx).Tracef("ephemeral: dropping account %s (lastDisc=%s, horizon=%s, now=%s)",
|
||||||
|
accountID, lastDisc, horizon, now)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
delay := horizon.Sub(now) + e.cleanupWindow
|
||||||
|
idForClosure := accountID
|
||||||
|
entry.timer = time.AfterFunc(delay, func() {
|
||||||
|
e.sweep(ctx, idForClosure)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EphemeralManager) isPeerOnList(id string) bool {
|
// rearm reschedules a sweep `delay` from now. Used after a recoverable
|
||||||
for p := e.headPeer; p != nil; p = p.next {
|
// error in the sweep path so the account doesn't get stuck.
|
||||||
if p.id == id {
|
func (e *EphemeralManager) rearm(ctx context.Context, accountID string, delay time.Duration) {
|
||||||
return true
|
e.accountsLock.Lock()
|
||||||
}
|
defer e.accountsLock.Unlock()
|
||||||
|
if e.stopped {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
return false
|
entry, ok := e.accounts[accountID]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
idForClosure := accountID
|
||||||
|
entry.timer = time.AfterFunc(delay, func() {
|
||||||
|
e.sweep(ctx, idForClosure)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EphemeralManager) newDeadLine() time.Time {
|
// defaultInitialLoadDelay returns a random duration in
|
||||||
return timeNow().Add(e.lifeTime)
|
// [initialLoadMinDelay, initialLoadMaxDelay). Process-wide
|
||||||
|
// math/rand is acceptable here — the delay is purely a smoothing
|
||||||
|
// jitter, not a security primitive.
|
||||||
|
func defaultInitialLoadDelay() time.Duration {
|
||||||
|
span := int64(initialLoadMaxDelay - initialLoadMinDelay)
|
||||||
|
if span <= 0 {
|
||||||
|
return initialLoadMinDelay
|
||||||
|
}
|
||||||
|
return initialLoadMinDelay + time.Duration(rand.Int63n(span))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,299 +2,544 @@ package manager
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/mock/gomock"
|
"github.com/golang/mock/gomock"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
nbdns "github.com/netbirdio/netbird/dns"
|
nbdns "github.com/netbirdio/netbird/dns"
|
||||||
"github.com/netbirdio/netbird/management/internals/modules/peers"
|
"github.com/netbirdio/netbird/management/internals/modules/peers"
|
||||||
"github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral"
|
"github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral"
|
||||||
nbAccount "github.com/netbirdio/netbird/management/server/account"
|
|
||||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||||
"github.com/netbirdio/netbird/management/server/store"
|
"github.com/netbirdio/netbird/management/server/store"
|
||||||
"github.com/netbirdio/netbird/management/server/types"
|
"github.com/netbirdio/netbird/management/server/types"
|
||||||
"github.com/netbirdio/netbird/route"
|
"github.com/netbirdio/netbird/route"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// MockStore is a thin in-memory stand-in that implements only the two
|
||||||
|
// methods the EphemeralManager uses. It honors the account / ephemeral
|
||||||
|
// / connected / lastSeen attributes of each peer so the cleanup logic
|
||||||
|
// can be exercised end-to-end without bringing up sqlite or Postgres.
|
||||||
type MockStore struct {
|
type MockStore struct {
|
||||||
store.Store
|
store.Store
|
||||||
|
mu sync.Mutex
|
||||||
account *types.Account
|
account *types.Account
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MockStore) GetAllEphemeralPeers(_ context.Context, _ store.LockingStrength) ([]*nbpeer.Peer, error) {
|
func (s *MockStore) GetStaleEphemeralPeerIDsForAccount(_ context.Context, accountID string, olderThan time.Time) ([]string, error) {
|
||||||
var peers []*nbpeer.Peer
|
s.mu.Lock()
|
||||||
for _, v := range s.account.Peers {
|
defer s.mu.Unlock()
|
||||||
if v.Ephemeral {
|
if s.account == nil || s.account.Id != accountID {
|
||||||
peers = append(peers, v)
|
return nil, nil
|
||||||
|
}
|
||||||
|
var ids []string
|
||||||
|
for _, p := range s.account.Peers {
|
||||||
|
if !p.Ephemeral {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if p.Status == nil || p.Status.Connected {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if p.Status.LastSeen.Before(olderThan) {
|
||||||
|
ids = append(ids, p.ID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return peers, nil
|
return ids, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type MockAccountManager struct {
|
func (s *MockStore) GetEphemeralAccountsLastDisconnect(_ context.Context) (map[string]time.Time, error) {
|
||||||
mu sync.Mutex
|
s.mu.Lock()
|
||||||
nbAccount.Manager
|
defer s.mu.Unlock()
|
||||||
store *MockStore
|
out := map[string]time.Time{}
|
||||||
deletePeerCalls int
|
if s.account == nil {
|
||||||
bufferUpdateCalls map[string]int
|
return out, nil
|
||||||
wg *sync.WaitGroup
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *MockAccountManager) DeletePeer(_ context.Context, accountID, peerID, userID string) error {
|
|
||||||
a.mu.Lock()
|
|
||||||
defer a.mu.Unlock()
|
|
||||||
a.deletePeerCalls++
|
|
||||||
delete(a.store.account.Peers, peerID)
|
|
||||||
if a.wg != nil {
|
|
||||||
a.wg.Done()
|
|
||||||
}
|
}
|
||||||
return nil
|
var latest time.Time
|
||||||
}
|
hasAny := false
|
||||||
|
for _, p := range s.account.Peers {
|
||||||
func (a *MockAccountManager) GetDeletePeerCalls() int {
|
if !p.Ephemeral || p.Status == nil || p.Status.Connected {
|
||||||
a.mu.Lock()
|
continue
|
||||||
defer a.mu.Unlock()
|
}
|
||||||
return a.deletePeerCalls
|
if !hasAny || p.Status.LastSeen.After(latest) {
|
||||||
}
|
latest = p.Status.LastSeen
|
||||||
|
hasAny = true
|
||||||
func (a *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) {
|
}
|
||||||
a.mu.Lock()
|
|
||||||
defer a.mu.Unlock()
|
|
||||||
if a.bufferUpdateCalls == nil {
|
|
||||||
a.bufferUpdateCalls = make(map[string]int)
|
|
||||||
}
|
}
|
||||||
a.bufferUpdateCalls[accountID]++
|
if hasAny {
|
||||||
}
|
out[s.account.Id] = latest
|
||||||
|
|
||||||
func (a *MockAccountManager) GetBufferUpdateCalls(accountID string) int {
|
|
||||||
a.mu.Lock()
|
|
||||||
defer a.mu.Unlock()
|
|
||||||
if a.bufferUpdateCalls == nil {
|
|
||||||
return 0
|
|
||||||
}
|
}
|
||||||
return a.bufferUpdateCalls[accountID]
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *MockAccountManager) GetStore() store.Store {
|
// withFakeClock pins timeNow to a settable value for the duration of t.
|
||||||
return a.store
|
// Returns a getter and a setter so subtests can advance virtual time.
|
||||||
}
|
func withFakeClock(t *testing.T, start time.Time) (get func() time.Time, set func(time.Time)) {
|
||||||
|
t.Helper()
|
||||||
func TestNewManager(t *testing.T) {
|
var mu sync.Mutex
|
||||||
t.Cleanup(func() {
|
now := start
|
||||||
timeNow = time.Now
|
|
||||||
})
|
|
||||||
startTime := time.Now()
|
|
||||||
timeNow = func() time.Time {
|
timeNow = func() time.Time {
|
||||||
return startTime
|
mu.Lock()
|
||||||
|
defer mu.Unlock()
|
||||||
|
return now
|
||||||
}
|
}
|
||||||
|
t.Cleanup(func() { timeNow = time.Now })
|
||||||
|
|
||||||
store := &MockStore{}
|
return func() time.Time {
|
||||||
ctrl := gomock.NewController(t)
|
mu.Lock()
|
||||||
peersManager := peers.NewMockManager(ctrl)
|
defer mu.Unlock()
|
||||||
|
return now
|
||||||
numberOfPeers := 5
|
}, func(v time.Time) {
|
||||||
numberOfEphemeralPeers := 3
|
mu.Lock()
|
||||||
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
|
defer mu.Unlock()
|
||||||
|
now = v
|
||||||
// Expect DeletePeers to be called for ephemeral peers
|
}
|
||||||
peersManager.EXPECT().
|
|
||||||
DeletePeers(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), true).
|
|
||||||
DoAndReturn(func(ctx context.Context, accountID string, peerIDs []string, userID string, checkConnected bool) error {
|
|
||||||
for _, peerID := range peerIDs {
|
|
||||||
delete(store.account.Peers, peerID)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}).
|
|
||||||
AnyTimes()
|
|
||||||
|
|
||||||
mgr := NewEphemeralManager(store, peersManager)
|
|
||||||
mgr.loadEphemeralPeers(context.Background())
|
|
||||||
startTime = startTime.Add(ephemeral.EphemeralLifeTime + 1)
|
|
||||||
mgr.cleanup(context.Background())
|
|
||||||
|
|
||||||
if len(store.account.Peers) != numberOfPeers {
|
|
||||||
t.Errorf("failed to cleanup ephemeral peers, expected: %d, result: %d", numberOfPeers, len(store.account.Peers))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewManagerPeerConnected(t *testing.T) {
|
// newManagerForTest builds a manager with short timers and no random
|
||||||
t.Cleanup(func() {
|
// initial-load delay so tests run instantly.
|
||||||
timeNow = time.Now
|
func newManagerForTest(t *testing.T, st store.Store, peersMgr peers.Manager) *EphemeralManager {
|
||||||
})
|
t.Helper()
|
||||||
startTime := time.Now()
|
mgr := NewEphemeralManager(st, peersMgr)
|
||||||
timeNow = func() time.Time {
|
mgr.lifeTime = 100 * time.Millisecond
|
||||||
return startTime
|
mgr.cleanupWindow = 10 * time.Millisecond
|
||||||
}
|
mgr.initialLoadDelay = func() time.Duration { return 0 }
|
||||||
|
t.Cleanup(mgr.Stop)
|
||||||
store := &MockStore{}
|
return mgr
|
||||||
ctrl := gomock.NewController(t)
|
|
||||||
peersManager := peers.NewMockManager(ctrl)
|
|
||||||
|
|
||||||
numberOfPeers := 5
|
|
||||||
numberOfEphemeralPeers := 3
|
|
||||||
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
|
|
||||||
|
|
||||||
// Expect DeletePeers to be called for ephemeral peers (except the connected one)
|
|
||||||
peersManager.EXPECT().
|
|
||||||
DeletePeers(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), true).
|
|
||||||
DoAndReturn(func(ctx context.Context, accountID string, peerIDs []string, userID string, checkConnected bool) error {
|
|
||||||
for _, peerID := range peerIDs {
|
|
||||||
delete(store.account.Peers, peerID)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}).
|
|
||||||
AnyTimes()
|
|
||||||
|
|
||||||
mgr := NewEphemeralManager(store, peersManager)
|
|
||||||
mgr.loadEphemeralPeers(context.Background())
|
|
||||||
mgr.OnPeerConnected(context.Background(), store.account.Peers["ephemeral_peer_0"])
|
|
||||||
|
|
||||||
startTime = startTime.Add(ephemeral.EphemeralLifeTime + 1)
|
|
||||||
mgr.cleanup(context.Background())
|
|
||||||
|
|
||||||
expected := numberOfPeers + 1
|
|
||||||
if len(store.account.Peers) != expected {
|
|
||||||
t.Errorf("failed to cleanup ephemeral peers, expected: %d, result: %d", expected, len(store.account.Peers))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewManagerPeerDisconnected(t *testing.T) {
|
// TestOnPeerDisconnected_RegistersAndSweeps drives the OnPeerDisconnected
|
||||||
t.Cleanup(func() {
|
// path with a fake clock: a single ephemeral peer disconnects, we
|
||||||
timeNow = time.Now
|
// advance past the staleness window, and the sweep deletes it.
|
||||||
})
|
func TestOnPeerDisconnected_RegistersAndSweeps(t *testing.T) {
|
||||||
startTime := time.Now()
|
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||||
timeNow = func() time.Time {
|
|
||||||
return startTime
|
|
||||||
}
|
|
||||||
|
|
||||||
store := &MockStore{}
|
getNow, setNow := withFakeClock(t, time.Now())
|
||||||
ctrl := gomock.NewController(t)
|
|
||||||
peersManager := peers.NewMockManager(ctrl)
|
|
||||||
|
|
||||||
numberOfPeers := 5
|
|
||||||
numberOfEphemeralPeers := 3
|
|
||||||
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
|
|
||||||
|
|
||||||
// Expect DeletePeers to be called for the one disconnected peer
|
|
||||||
peersManager.EXPECT().
|
|
||||||
DeletePeers(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), true).
|
|
||||||
DoAndReturn(func(ctx context.Context, accountID string, peerIDs []string, userID string, checkConnected bool) error {
|
|
||||||
for _, peerID := range peerIDs {
|
|
||||||
delete(store.account.Peers, peerID)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}).
|
|
||||||
AnyTimes()
|
|
||||||
|
|
||||||
mgr := NewEphemeralManager(store, peersManager)
|
|
||||||
mgr.loadEphemeralPeers(context.Background())
|
|
||||||
for _, v := range store.account.Peers {
|
|
||||||
mgr.OnPeerConnected(context.Background(), v)
|
|
||||||
|
|
||||||
}
|
|
||||||
mgr.OnPeerDisconnected(context.Background(), store.account.Peers["ephemeral_peer_0"])
|
|
||||||
|
|
||||||
startTime = startTime.Add(ephemeral.EphemeralLifeTime + 1)
|
|
||||||
mgr.cleanup(context.Background())
|
|
||||||
|
|
||||||
expected := numberOfPeers + numberOfEphemeralPeers - 1
|
|
||||||
if len(store.account.Peers) != expected {
|
|
||||||
t.Errorf("failed to cleanup ephemeral peers, expected: %d, result: %d", expected, len(store.account.Peers))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCleanupSchedulingBehaviorIsBatched(t *testing.T) {
|
|
||||||
const (
|
|
||||||
ephemeralPeers = 10
|
|
||||||
testLifeTime = 1 * time.Second
|
|
||||||
testCleanupWindow = 100 * time.Millisecond
|
|
||||||
)
|
|
||||||
|
|
||||||
t.Cleanup(func() {
|
|
||||||
timeNow = time.Now
|
|
||||||
})
|
|
||||||
startTime := time.Now()
|
|
||||||
timeNow = func() time.Time {
|
|
||||||
return startTime
|
|
||||||
}
|
|
||||||
|
|
||||||
mockStore := &MockStore{}
|
|
||||||
account := newAccountWithId(context.Background(), "account", "", "", false)
|
|
||||||
mockStore.account = account
|
|
||||||
|
|
||||||
wg := &sync.WaitGroup{}
|
|
||||||
wg.Add(ephemeralPeers)
|
|
||||||
mockAM := &MockAccountManager{
|
|
||||||
store: mockStore,
|
|
||||||
wg: wg,
|
|
||||||
}
|
|
||||||
|
|
||||||
ctrl := gomock.NewController(t)
|
ctrl := gomock.NewController(t)
|
||||||
peersManager := peers.NewMockManager(ctrl)
|
peersMgr := peers.NewMockManager(ctrl)
|
||||||
|
|
||||||
// Set up expectation that DeletePeers will be called once with all peer IDs
|
var deletedMu sync.Mutex
|
||||||
peersManager.EXPECT().
|
var deleted []string
|
||||||
DeletePeers(gomock.Any(), account.Id, gomock.Any(), gomock.Any(), true).
|
var deleteCalls atomic.Int32
|
||||||
DoAndReturn(func(ctx context.Context, accountID string, peerIDs []string, userID string, checkConnected bool) error {
|
peersMgr.EXPECT().
|
||||||
// Simulate the actual deletion behavior
|
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
|
||||||
for _, peerID := range peerIDs {
|
DoAndReturn(func(_ context.Context, accountID string, peerIDs []string, _ string, _ bool) error {
|
||||||
err := mockAM.DeletePeer(ctx, accountID, peerID, userID)
|
deleteCalls.Add(1)
|
||||||
if err != nil {
|
mockStore.mu.Lock()
|
||||||
return err
|
for _, id := range peerIDs {
|
||||||
}
|
delete(mockStore.account.Peers, id)
|
||||||
}
|
}
|
||||||
mockAM.BufferUpdateAccountPeers(ctx, accountID, types.UpdateReason{})
|
mockStore.mu.Unlock()
|
||||||
|
deletedMu.Lock()
|
||||||
|
deleted = append(deleted, peerIDs...)
|
||||||
|
deletedMu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}).
|
}).AnyTimes()
|
||||||
Times(1)
|
|
||||||
|
|
||||||
mgr := NewEphemeralManager(mockStore, peersManager)
|
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||||
mgr.lifeTime = testLifeTime
|
|
||||||
mgr.cleanupWindow = testCleanupWindow
|
|
||||||
|
|
||||||
// Add peers and disconnect them at slightly different times (within cleanup window)
|
// One ephemeral peer that disconnected "now".
|
||||||
for i := range ephemeralPeers {
|
now := getNow()
|
||||||
p := &nbpeer.Peer{ID: fmt.Sprintf("peer-%d", i), AccountID: account.Id, Ephemeral: true}
|
p := &nbpeer.Peer{
|
||||||
mockStore.account.Peers[p.ID] = p
|
ID: "p1",
|
||||||
|
AccountID: "acc-1",
|
||||||
|
Ephemeral: true,
|
||||||
|
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now},
|
||||||
|
}
|
||||||
|
mockStore.account.Peers[p.ID] = p
|
||||||
|
mgr.OnPeerDisconnected(context.Background(), p)
|
||||||
|
|
||||||
|
// Advance past lifeTime + cleanupWindow so the timer-driven sweep fires.
|
||||||
|
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
|
||||||
|
require.Eventually(t, func() bool { return deleteCalls.Load() >= 1 }, 2*time.Second, 5*time.Millisecond,
|
||||||
|
"sweep should fire and delete the stale peer")
|
||||||
|
|
||||||
|
deletedMu.Lock()
|
||||||
|
deletedCopy := append([]string(nil), deleted...)
|
||||||
|
deletedMu.Unlock()
|
||||||
|
require.Equal(t, []string{"p1"}, deletedCopy, "only the one ephemeral peer should be deleted")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestOnPeerDisconnected_NonEphemeralIgnored: a non-ephemeral disconnect
|
||||||
|
// must not register the account or arm any timer.
|
||||||
|
func TestOnPeerDisconnected_NonEphemeralIgnored(t *testing.T) {
|
||||||
|
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||||
|
withFakeClock(t, time.Now())
|
||||||
|
|
||||||
|
ctrl := gomock.NewController(t)
|
||||||
|
peersMgr := peers.NewMockManager(ctrl)
|
||||||
|
// No DeletePeers expectation — must not be called.
|
||||||
|
|
||||||
|
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||||
|
mgr.OnPeerDisconnected(context.Background(), &nbpeer.Peer{
|
||||||
|
ID: "p1",
|
||||||
|
AccountID: "acc-1",
|
||||||
|
Ephemeral: false,
|
||||||
|
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: timeNow()},
|
||||||
|
})
|
||||||
|
|
||||||
|
mgr.accountsLock.Lock()
|
||||||
|
require.Empty(t, mgr.accounts, "non-ephemeral disconnect must not register an account")
|
||||||
|
mgr.accountsLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSweep_DropsAccountWhenIdle: after a sweep cleans the stale peers,
|
||||||
|
// if no more disconnects have arrived the account must be dropped from
|
||||||
|
// the in-memory tracker.
|
||||||
|
func TestSweep_DropsAccountWhenIdle(t *testing.T) {
|
||||||
|
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||||
|
getNow, setNow := withFakeClock(t, time.Now())
|
||||||
|
|
||||||
|
ctrl := gomock.NewController(t)
|
||||||
|
peersMgr := peers.NewMockManager(ctrl)
|
||||||
|
peersMgr.EXPECT().
|
||||||
|
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
|
||||||
|
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
|
||||||
|
mockStore.mu.Lock()
|
||||||
|
for _, id := range peerIDs {
|
||||||
|
delete(mockStore.account.Peers, id)
|
||||||
|
}
|
||||||
|
mockStore.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}).AnyTimes()
|
||||||
|
|
||||||
|
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||||
|
|
||||||
|
now := getNow()
|
||||||
|
p := &nbpeer.Peer{ID: "p1", AccountID: "acc-1", Ephemeral: true,
|
||||||
|
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now}}
|
||||||
|
mockStore.account.Peers[p.ID] = p
|
||||||
|
mgr.OnPeerDisconnected(context.Background(), p)
|
||||||
|
|
||||||
|
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
|
||||||
|
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
mgr.accountsLock.Lock()
|
||||||
|
defer mgr.accountsLock.Unlock()
|
||||||
|
return len(mgr.accounts) == 0
|
||||||
|
}, 2*time.Second, 5*time.Millisecond, "account should be dropped after sweep with no new disconnects")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSweep_ReArmsWhenNewDisconnectArrived: simulate the race where a
|
||||||
|
// fresh disconnect arrives just before the sweep fires. The sweep must
|
||||||
|
// observe the updated lastDisc and re-arm rather than drop.
|
||||||
|
func TestSweep_ReArmsWhenNewDisconnectArrived(t *testing.T) {
|
||||||
|
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||||
|
getNow, setNow := withFakeClock(t, time.Now())
|
||||||
|
|
||||||
|
ctrl := gomock.NewController(t)
|
||||||
|
peersMgr := peers.NewMockManager(ctrl)
|
||||||
|
peersMgr.EXPECT().
|
||||||
|
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
|
||||||
|
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
|
||||||
|
mockStore.mu.Lock()
|
||||||
|
for _, id := range peerIDs {
|
||||||
|
delete(mockStore.account.Peers, id)
|
||||||
|
}
|
||||||
|
mockStore.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}).AnyTimes()
|
||||||
|
|
||||||
|
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||||
|
|
||||||
|
now := getNow()
|
||||||
|
p1 := &nbpeer.Peer{ID: "p1", AccountID: "acc-1", Ephemeral: true,
|
||||||
|
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now}}
|
||||||
|
mockStore.account.Peers[p1.ID] = p1
|
||||||
|
mgr.OnPeerDisconnected(context.Background(), p1)
|
||||||
|
|
||||||
|
// Advance most of the way toward the first sweep, then introduce
|
||||||
|
// a fresh disconnect that resets lastDisc.
|
||||||
|
setNow(now.Add(mgr.lifeTime - 10*time.Millisecond))
|
||||||
|
p2 := &nbpeer.Peer{ID: "p2", AccountID: "acc-1", Ephemeral: true,
|
||||||
|
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: getNow()}}
|
||||||
|
mockStore.account.Peers[p2.ID] = p2
|
||||||
|
mgr.OnPeerDisconnected(context.Background(), p2)
|
||||||
|
|
||||||
|
// Push past p1's staleness so the first sweep runs and cleans p1
|
||||||
|
// but observes p2 already on the account entry. It must re-arm.
|
||||||
|
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
|
||||||
|
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
mockStore.mu.Lock()
|
||||||
|
defer mockStore.mu.Unlock()
|
||||||
|
_, gone := mockStore.account.Peers["p1"]
|
||||||
|
return !gone
|
||||||
|
}, 2*time.Second, 5*time.Millisecond, "p1 should be cleaned at the first sweep")
|
||||||
|
|
||||||
|
// The account should still be tracked because p2 is younger than lifeTime
|
||||||
|
// from the sweep's vantage point at this moment.
|
||||||
|
mgr.accountsLock.Lock()
|
||||||
|
_, stillTracked := mgr.accounts["acc-1"]
|
||||||
|
mgr.accountsLock.Unlock()
|
||||||
|
require.True(t, stillTracked, "account should remain tracked because p2's disconnect kept it active")
|
||||||
|
|
||||||
|
// Push past p2's staleness; second sweep cleans p2 and drops the account.
|
||||||
|
setNow(getNow().Add(mgr.lifeTime + 5*mgr.cleanupWindow))
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
mgr.accountsLock.Lock()
|
||||||
|
defer mgr.accountsLock.Unlock()
|
||||||
|
return len(mgr.accounts) == 0
|
||||||
|
}, 2*time.Second, 5*time.Millisecond, "account should drop after the final sweep")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSweep_BatchesPeersPerAccount: many ephemeral peers disconnect on
|
||||||
|
// the same account; a single sweep must delete them all in one
|
||||||
|
// DeletePeers call.
|
||||||
|
func TestSweep_BatchesPeersPerAccount(t *testing.T) {
|
||||||
|
const ephemeralPeers = 8
|
||||||
|
|
||||||
|
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||||
|
getNow, setNow := withFakeClock(t, time.Now())
|
||||||
|
|
||||||
|
ctrl := gomock.NewController(t)
|
||||||
|
peersMgr := peers.NewMockManager(ctrl)
|
||||||
|
|
||||||
|
deleteBatches := make(chan []string, 4)
|
||||||
|
peersMgr.EXPECT().
|
||||||
|
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
|
||||||
|
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
|
||||||
|
cp := append([]string(nil), peerIDs...)
|
||||||
|
mockStore.mu.Lock()
|
||||||
|
for _, id := range peerIDs {
|
||||||
|
delete(mockStore.account.Peers, id)
|
||||||
|
}
|
||||||
|
mockStore.mu.Unlock()
|
||||||
|
deleteBatches <- cp
|
||||||
|
return nil
|
||||||
|
}).Times(1)
|
||||||
|
|
||||||
|
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||||
|
|
||||||
|
now := getNow()
|
||||||
|
for i := 0; i < ephemeralPeers; i++ {
|
||||||
|
id := fmt.Sprintf("p-%d", i)
|
||||||
|
// Stagger by a fraction of cleanupWindow so they all fall on
|
||||||
|
// the same sweep tick.
|
||||||
|
when := now.Add(time.Duration(i) * time.Millisecond)
|
||||||
|
p := &nbpeer.Peer{ID: id, AccountID: "acc-1", Ephemeral: true,
|
||||||
|
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: when}}
|
||||||
|
mockStore.account.Peers[id] = p
|
||||||
mgr.OnPeerDisconnected(context.Background(), p)
|
mgr.OnPeerDisconnected(context.Background(), p)
|
||||||
startTime = startTime.Add(testCleanupWindow / (ephemeralPeers * 2))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Advance time past the lifetime to trigger cleanup
|
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
|
||||||
startTime = startTime.Add(testLifeTime + testCleanupWindow)
|
|
||||||
|
|
||||||
// Wait for all deletions to complete
|
select {
|
||||||
wg.Wait()
|
case batch := <-deleteBatches:
|
||||||
|
require.Len(t, batch, ephemeralPeers, "all peers should be deleted in a single batch")
|
||||||
assert.Len(t, mockStore.account.Peers, 0, "all ephemeral peers should be cleaned up after the lifetime")
|
case <-time.After(2 * time.Second):
|
||||||
assert.Equal(t, 1, mockAM.GetBufferUpdateCalls(account.Id), "buffer update should be called once")
|
t.Fatal("expected one batched DeletePeers call")
|
||||||
assert.Equal(t, ephemeralPeers, mockAM.GetDeletePeerCalls(), "should have deleted all peers")
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func seedPeers(store *MockStore, numberOfPeers int, numberOfEphemeralPeers int) {
|
// TestLoadInitialAccounts_SeedsFromStore exercises the post-restart
|
||||||
store.account = newAccountWithId(context.Background(), "my account", "", "", false)
|
// catch-up path: pre-populate the store, point the manager at it, and
|
||||||
|
// confirm both already-stale and not-yet-stale peers get cleaned at
|
||||||
|
// their proper times.
|
||||||
|
func TestLoadInitialAccounts_SeedsFromStore(t *testing.T) {
|
||||||
|
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||||
|
getNow, setNow := withFakeClock(t, time.Now())
|
||||||
|
|
||||||
for i := 0; i < numberOfPeers; i++ {
|
now := getNow()
|
||||||
peerId := fmt.Sprintf("peer_%d", i)
|
// p-stale: already past the staleness window when load runs.
|
||||||
p := &nbpeer.Peer{
|
mockStore.account.Peers["p-stale"] = &nbpeer.Peer{
|
||||||
ID: peerId,
|
ID: "p-stale", AccountID: "acc-1", Ephemeral: true,
|
||||||
Ephemeral: false,
|
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now.Add(-time.Hour)},
|
||||||
}
|
}
|
||||||
store.account.Peers[p.ID] = p
|
// p-fresh: disconnected but not yet stale.
|
||||||
|
mockStore.account.Peers["p-fresh"] = &nbpeer.Peer{
|
||||||
|
ID: "p-fresh", AccountID: "acc-1", Ephemeral: true,
|
||||||
|
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now},
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < numberOfEphemeralPeers; i++ {
|
ctrl := gomock.NewController(t)
|
||||||
peerId := fmt.Sprintf("ephemeral_peer_%d", i)
|
peersMgr := peers.NewMockManager(ctrl)
|
||||||
p := &nbpeer.Peer{
|
peersMgr.EXPECT().
|
||||||
ID: peerId,
|
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
|
||||||
Ephemeral: true,
|
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
|
||||||
}
|
mockStore.mu.Lock()
|
||||||
store.account.Peers[p.ID] = p
|
for _, id := range peerIDs {
|
||||||
|
delete(mockStore.account.Peers, id)
|
||||||
|
}
|
||||||
|
mockStore.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}).AnyTimes()
|
||||||
|
|
||||||
|
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||||
|
// Drive loadInitialAccounts directly with the fake-clock-aware now.
|
||||||
|
mgr.loadInitialAccounts(context.Background())
|
||||||
|
|
||||||
|
// First sweep should fire shortly (cleanupWindow) for the stale peer.
|
||||||
|
setNow(now.Add(5 * mgr.cleanupWindow))
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
mockStore.mu.Lock()
|
||||||
|
defer mockStore.mu.Unlock()
|
||||||
|
_, gone := mockStore.account.Peers["p-stale"]
|
||||||
|
return !gone
|
||||||
|
}, 2*time.Second, 5*time.Millisecond, "p-stale should be deleted on the first sweep")
|
||||||
|
|
||||||
|
// p-fresh is not yet stale; advance past its window.
|
||||||
|
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
mockStore.mu.Lock()
|
||||||
|
defer mockStore.mu.Unlock()
|
||||||
|
_, gone := mockStore.account.Peers["p-fresh"]
|
||||||
|
return !gone
|
||||||
|
}, 2*time.Second, 5*time.Millisecond, "p-fresh should be deleted once it crosses the staleness window")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestStop_CancelsPendingWork verifies that Stop() cancels both the
|
||||||
|
// deferred initial load and per-account sweep timers and that
|
||||||
|
// subsequent OnPeerDisconnected calls are ignored.
|
||||||
|
func TestStop_CancelsPendingWork(t *testing.T) {
|
||||||
|
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||||
|
withFakeClock(t, time.Now())
|
||||||
|
|
||||||
|
ctrl := gomock.NewController(t)
|
||||||
|
peersMgr := peers.NewMockManager(ctrl)
|
||||||
|
// DeletePeers must NOT be called after Stop.
|
||||||
|
|
||||||
|
mgr := NewEphemeralManager(mockStore, peersMgr)
|
||||||
|
mgr.lifeTime = 100 * time.Millisecond
|
||||||
|
mgr.cleanupWindow = 10 * time.Millisecond
|
||||||
|
// Use a long delay so the initial-load timer is still pending.
|
||||||
|
mgr.initialLoadDelay = func() time.Duration { return time.Hour }
|
||||||
|
|
||||||
|
mgr.LoadInitialPeers(context.Background())
|
||||||
|
mgr.OnPeerDisconnected(context.Background(), &nbpeer.Peer{
|
||||||
|
ID: "p1", AccountID: "acc-1", Ephemeral: true,
|
||||||
|
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: timeNow()},
|
||||||
|
})
|
||||||
|
|
||||||
|
mgr.accountsLock.Lock()
|
||||||
|
require.NotNil(t, mgr.initialLoadTimer, "initial-load timer should be armed")
|
||||||
|
require.Len(t, mgr.accounts, 1, "account should be tracked after disconnect")
|
||||||
|
mgr.accountsLock.Unlock()
|
||||||
|
|
||||||
|
mgr.Stop()
|
||||||
|
|
||||||
|
mgr.accountsLock.Lock()
|
||||||
|
require.Empty(t, mgr.accounts, "Stop should clear tracked accounts")
|
||||||
|
require.True(t, mgr.stopped, "stopped flag must be set")
|
||||||
|
mgr.accountsLock.Unlock()
|
||||||
|
|
||||||
|
// Post-stop disconnect must be ignored.
|
||||||
|
mgr.OnPeerDisconnected(context.Background(), &nbpeer.Peer{
|
||||||
|
ID: "p2", AccountID: "acc-1", Ephemeral: true,
|
||||||
|
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: timeNow()},
|
||||||
|
})
|
||||||
|
mgr.accountsLock.Lock()
|
||||||
|
require.Empty(t, mgr.accounts, "disconnects after Stop must be ignored")
|
||||||
|
mgr.accountsLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestOnPeerConnected_IsNoop: the OnPeerConnected hook is preserved on
|
||||||
|
// the interface but does nothing in the per-account model — the sweep
|
||||||
|
// query filters connected peers at the DB level.
|
||||||
|
func TestOnPeerConnected_IsNoop(t *testing.T) {
|
||||||
|
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
|
||||||
|
withFakeClock(t, time.Now())
|
||||||
|
|
||||||
|
ctrl := gomock.NewController(t)
|
||||||
|
peersMgr := peers.NewMockManager(ctrl)
|
||||||
|
|
||||||
|
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||||
|
mgr.OnPeerDisconnected(context.Background(), &nbpeer.Peer{
|
||||||
|
ID: "p1", AccountID: "acc-1", Ephemeral: true,
|
||||||
|
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: timeNow()},
|
||||||
|
})
|
||||||
|
mgr.accountsLock.Lock()
|
||||||
|
require.Len(t, mgr.accounts, 1, "disconnect should track the account")
|
||||||
|
mgr.accountsLock.Unlock()
|
||||||
|
|
||||||
|
mgr.OnPeerConnected(context.Background(), &nbpeer.Peer{ID: "p1", AccountID: "acc-1", Ephemeral: true})
|
||||||
|
mgr.accountsLock.Lock()
|
||||||
|
require.Len(t, mgr.accounts, 1, "OnPeerConnected must be a no-op")
|
||||||
|
mgr.accountsLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSweep_StoreErrorReArms: if the stale-peer query fails, the
|
||||||
|
// account must remain tracked and a follow-up sweep gets scheduled.
|
||||||
|
func TestSweep_StoreErrorReArms(t *testing.T) {
|
||||||
|
mockStore := &erroringStore{
|
||||||
|
MockStore: MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)},
|
||||||
|
}
|
||||||
|
getNow, setNow := withFakeClock(t, time.Now())
|
||||||
|
|
||||||
|
ctrl := gomock.NewController(t)
|
||||||
|
peersMgr := peers.NewMockManager(ctrl)
|
||||||
|
|
||||||
|
mgr := newManagerForTest(t, mockStore, peersMgr)
|
||||||
|
|
||||||
|
p := &nbpeer.Peer{ID: "p1", AccountID: "acc-1", Ephemeral: true,
|
||||||
|
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: getNow()}}
|
||||||
|
mockStore.account.Peers[p.ID] = p
|
||||||
|
mgr.OnPeerDisconnected(context.Background(), p)
|
||||||
|
|
||||||
|
mockStore.fail.Store(true)
|
||||||
|
setNow(getNow().Add(mgr.lifeTime + 5*mgr.cleanupWindow))
|
||||||
|
|
||||||
|
// Wait until the failing sweep has run at least once.
|
||||||
|
require.Eventually(t, func() bool { return mockStore.failedCalls.Load() >= 1 },
|
||||||
|
2*time.Second, 5*time.Millisecond, "expected at least one failing sweep")
|
||||||
|
|
||||||
|
mgr.accountsLock.Lock()
|
||||||
|
_, stillTracked := mgr.accounts["acc-1"]
|
||||||
|
mgr.accountsLock.Unlock()
|
||||||
|
require.True(t, stillTracked, "account must remain tracked after a sweep error")
|
||||||
|
|
||||||
|
// Recover and ensure the rearmed sweep cleans up.
|
||||||
|
peersMgr.EXPECT().
|
||||||
|
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
|
||||||
|
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
|
||||||
|
mockStore.mu.Lock()
|
||||||
|
for _, id := range peerIDs {
|
||||||
|
delete(mockStore.account.Peers, id)
|
||||||
|
}
|
||||||
|
mockStore.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}).AnyTimes()
|
||||||
|
mockStore.fail.Store(false)
|
||||||
|
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
mockStore.mu.Lock()
|
||||||
|
defer mockStore.mu.Unlock()
|
||||||
|
_, gone := mockStore.account.Peers["p1"]
|
||||||
|
return !gone
|
||||||
|
}, 2*time.Second, 5*time.Millisecond, "rearmed sweep should clean up after the store recovers")
|
||||||
|
}
|
||||||
|
|
||||||
|
// erroringStore is a MockStore that can be flipped into a failing mode
|
||||||
|
// to exercise the sweep's error-rearm path.
|
||||||
|
type erroringStore struct {
|
||||||
|
MockStore
|
||||||
|
fail atomic.Bool
|
||||||
|
failedCalls atomic.Int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *erroringStore) GetStaleEphemeralPeerIDsForAccount(ctx context.Context, accountID string, olderThan time.Time) ([]string, error) {
|
||||||
|
if s.fail.Load() {
|
||||||
|
s.failedCalls.Add(1)
|
||||||
|
return nil, errors.New("synthetic store error")
|
||||||
|
}
|
||||||
|
return s.MockStore.GetStaleEphemeralPeerIDsForAccount(ctx, accountID, olderThan)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDefaultInitialLoadDelay confirms the jitter falls inside the
|
||||||
|
// documented [8m, 10m) range — sanity check for the production timer.
|
||||||
|
func TestDefaultInitialLoadDelay(t *testing.T) {
|
||||||
|
for i := 0; i < 1000; i++ {
|
||||||
|
d := defaultInitialLoadDelay()
|
||||||
|
assert.GreaterOrEqual(t, d, initialLoadMinDelay)
|
||||||
|
assert.Less(t, d, initialLoadMaxDelay)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -351,3 +596,7 @@ func newAccountWithId(ctx context.Context, accountID, userID, domain string, dis
|
|||||||
}
|
}
|
||||||
return acc
|
return acc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// silence the import "github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral"
|
||||||
|
// (still needed indirectly for ephemeral.EphemeralLifeTime in production paths).
|
||||||
|
var _ = ephemeral.EphemeralLifeTime
|
||||||
|
|||||||
@@ -3463,6 +3463,49 @@ func (s *SqlStore) GetAllEphemeralPeers(ctx context.Context, lockStrength Lockin
|
|||||||
return allEphemeralPeers, nil
|
return allEphemeralPeers, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetStaleEphemeralPeerIDsForAccount returns IDs of disconnected
|
||||||
|
// ephemeral peers in the given account whose last_seen is strictly
|
||||||
|
// older than olderThan.
|
||||||
|
func (s *SqlStore) GetStaleEphemeralPeerIDsForAccount(ctx context.Context, accountID string, olderThan time.Time) ([]string, error) {
|
||||||
|
var ids []string
|
||||||
|
err := s.db.WithContext(ctx).
|
||||||
|
Model(&nbpeer.Peer{}).
|
||||||
|
Where("account_id = ? AND ephemeral = ? AND peer_status_connected = ? AND peer_status_last_seen < ?",
|
||||||
|
accountID, true, false, olderThan).
|
||||||
|
Pluck("id", &ids).Error
|
||||||
|
if err != nil {
|
||||||
|
log.WithContext(ctx).Errorf("failed to query stale ephemeral peers for account %s: %v", accountID, err)
|
||||||
|
return nil, status.Errorf(status.Internal, "query stale ephemeral peers")
|
||||||
|
}
|
||||||
|
return ids, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetEphemeralAccountsLastDisconnect returns the latest peer_status_last_seen
|
||||||
|
// per account across disconnected ephemeral peers. Returns one entry per
|
||||||
|
// account that has at least one such peer.
|
||||||
|
func (s *SqlStore) GetEphemeralAccountsLastDisconnect(ctx context.Context) (map[string]time.Time, error) {
|
||||||
|
type row struct {
|
||||||
|
AccountID string
|
||||||
|
LastSeen time.Time
|
||||||
|
}
|
||||||
|
var rows []row
|
||||||
|
err := s.db.WithContext(ctx).
|
||||||
|
Model(&nbpeer.Peer{}).
|
||||||
|
Select("account_id, MAX(peer_status_last_seen) AS last_seen").
|
||||||
|
Where("ephemeral = ? AND peer_status_connected = ?", true, false).
|
||||||
|
Group("account_id").
|
||||||
|
Scan(&rows).Error
|
||||||
|
if err != nil {
|
||||||
|
log.WithContext(ctx).Errorf("failed to load ephemeral-account last disconnect map: %v", err)
|
||||||
|
return nil, status.Errorf(status.Internal, "load ephemeral accounts")
|
||||||
|
}
|
||||||
|
out := make(map[string]time.Time, len(rows))
|
||||||
|
for _, r := range rows {
|
||||||
|
out[r.AccountID] = r.LastSeen
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
// DeletePeer removes a peer from the store.
|
// DeletePeer removes a peer from the store.
|
||||||
func (s *SqlStore) DeletePeer(ctx context.Context, accountID string, peerID string) error {
|
func (s *SqlStore) DeletePeer(ctx context.Context, accountID string, peerID string) error {
|
||||||
result := s.db.Delete(&nbpeer.Peer{}, accountAndIDQueryCondition, accountID, peerID)
|
result := s.db.Delete(&nbpeer.Peer{}, accountAndIDQueryCondition, accountID, peerID)
|
||||||
|
|||||||
@@ -165,6 +165,15 @@ type Store interface {
|
|||||||
GetAccountPeersWithExpiration(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*nbpeer.Peer, error)
|
GetAccountPeersWithExpiration(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*nbpeer.Peer, error)
|
||||||
GetAccountPeersWithInactivity(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*nbpeer.Peer, error)
|
GetAccountPeersWithInactivity(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*nbpeer.Peer, error)
|
||||||
GetAllEphemeralPeers(ctx context.Context, lockStrength LockingStrength) ([]*nbpeer.Peer, error)
|
GetAllEphemeralPeers(ctx context.Context, lockStrength LockingStrength) ([]*nbpeer.Peer, error)
|
||||||
|
// GetStaleEphemeralPeerIDsForAccount returns the IDs of disconnected
|
||||||
|
// ephemeral peers whose last_seen is strictly older than olderThan,
|
||||||
|
// scoped to a single account. Used by the per-account cleanup sweep.
|
||||||
|
GetStaleEphemeralPeerIDsForAccount(ctx context.Context, accountID string, olderThan time.Time) ([]string, error)
|
||||||
|
// GetEphemeralAccountsLastDisconnect returns, for every account that
|
||||||
|
// has at least one disconnected ephemeral peer, the most recent
|
||||||
|
// last_seen across that account's disconnected ephemeral peers. Used
|
||||||
|
// to reconstruct the per-account cleanup tracker after a restart.
|
||||||
|
GetEphemeralAccountsLastDisconnect(ctx context.Context) (map[string]time.Time, error)
|
||||||
SavePeer(ctx context.Context, accountID string, peer *nbpeer.Peer) error
|
SavePeer(ctx context.Context, accountID string, peer *nbpeer.Peer) error
|
||||||
SavePeerStatus(ctx context.Context, accountID, peerID string, status nbpeer.PeerStatus) error
|
SavePeerStatus(ctx context.Context, accountID, peerID string, status nbpeer.PeerStatus) error
|
||||||
// MarkPeerConnectedIfNewerSession sets the peer to connected with the
|
// MarkPeerConnectedIfNewerSession sets the peer to connected with the
|
||||||
|
|||||||
@@ -1376,6 +1376,36 @@ func (mr *MockStoreMockRecorder) GetAllEphemeralPeers(ctx, lockStrength interfac
|
|||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllEphemeralPeers", reflect.TypeOf((*MockStore)(nil).GetAllEphemeralPeers), ctx, lockStrength)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllEphemeralPeers", reflect.TypeOf((*MockStore)(nil).GetAllEphemeralPeers), ctx, lockStrength)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetStaleEphemeralPeerIDsForAccount mocks base method.
|
||||||
|
func (m *MockStore) GetStaleEphemeralPeerIDsForAccount(ctx context.Context, accountID string, olderThan time.Time) ([]string, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "GetStaleEphemeralPeerIDsForAccount", ctx, accountID, olderThan)
|
||||||
|
ret0, _ := ret[0].([]string)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetStaleEphemeralPeerIDsForAccount indicates an expected call of GetStaleEphemeralPeerIDsForAccount.
|
||||||
|
func (mr *MockStoreMockRecorder) GetStaleEphemeralPeerIDsForAccount(ctx, accountID, olderThan interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStaleEphemeralPeerIDsForAccount", reflect.TypeOf((*MockStore)(nil).GetStaleEphemeralPeerIDsForAccount), ctx, accountID, olderThan)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetEphemeralAccountsLastDisconnect mocks base method.
|
||||||
|
func (m *MockStore) GetEphemeralAccountsLastDisconnect(ctx context.Context) (map[string]time.Time, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "GetEphemeralAccountsLastDisconnect", ctx)
|
||||||
|
ret0, _ := ret[0].(map[string]time.Time)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetEphemeralAccountsLastDisconnect indicates an expected call of GetEphemeralAccountsLastDisconnect.
|
||||||
|
func (mr *MockStoreMockRecorder) GetEphemeralAccountsLastDisconnect(ctx interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEphemeralAccountsLastDisconnect", reflect.TypeOf((*MockStore)(nil).GetEphemeralAccountsLastDisconnect), ctx)
|
||||||
|
}
|
||||||
|
|
||||||
// GetAllProxyAccessTokens mocks base method.
|
// GetAllProxyAccessTokens mocks base method.
|
||||||
func (m *MockStore) GetAllProxyAccessTokens(ctx context.Context, lockStrength LockingStrength) ([]*types2.ProxyAccessToken, error) {
|
func (m *MockStore) GetAllProxyAccessTokens(ctx context.Context, lockStrength LockingStrength) ([]*types2.ProxyAccessToken, error) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|||||||
@@ -7,9 +7,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// EphemeralPeersMetrics tracks the ephemeral peer cleanup pipeline: how
|
// EphemeralPeersMetrics tracks the ephemeral peer cleanup pipeline: how
|
||||||
// many peers are currently scheduled for deletion, how many tick runs
|
// many accounts are currently being tracked for cleanup, how many sweep
|
||||||
// the cleaner has performed, how many peers it has removed, and how
|
// runs deleted at least one peer, how many peers have been removed, and
|
||||||
// many delete batches failed.
|
// how many delete batches failed.
|
||||||
type EphemeralPeersMetrics struct {
|
type EphemeralPeersMetrics struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
|
||||||
@@ -21,16 +21,16 @@ type EphemeralPeersMetrics struct {
|
|||||||
|
|
||||||
// NewEphemeralPeersMetrics constructs the ephemeral cleanup counters.
|
// NewEphemeralPeersMetrics constructs the ephemeral cleanup counters.
|
||||||
func NewEphemeralPeersMetrics(ctx context.Context, meter metric.Meter) (*EphemeralPeersMetrics, error) {
|
func NewEphemeralPeersMetrics(ctx context.Context, meter metric.Meter) (*EphemeralPeersMetrics, error) {
|
||||||
pending, err := meter.Int64UpDownCounter("management.ephemeral.peers.pending",
|
pending, err := meter.Int64UpDownCounter("management.ephemeral.accounts.tracked",
|
||||||
metric.WithUnit("1"),
|
metric.WithUnit("1"),
|
||||||
metric.WithDescription("Number of ephemeral peers currently waiting to be cleaned up"))
|
metric.WithDescription("Number of accounts currently tracked for ephemeral peer cleanup"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanupRuns, err := meter.Int64Counter("management.ephemeral.cleanup.runs.counter",
|
cleanupRuns, err := meter.Int64Counter("management.ephemeral.cleanup.runs.counter",
|
||||||
metric.WithUnit("1"),
|
metric.WithUnit("1"),
|
||||||
metric.WithDescription("Number of ephemeral cleanup ticks that processed at least one peer"))
|
metric.WithDescription("Number of ephemeral cleanup sweeps that deleted at least one peer"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -61,7 +61,8 @@ func NewEphemeralPeersMetrics(ctx context.Context, meter metric.Meter) (*Ephemer
|
|||||||
// All methods are nil-receiver safe so callers that haven't wired metrics
|
// All methods are nil-receiver safe so callers that haven't wired metrics
|
||||||
// (tests, self-hosted with metrics off) can invoke them unconditionally.
|
// (tests, self-hosted with metrics off) can invoke them unconditionally.
|
||||||
|
|
||||||
// IncPending bumps the pending gauge when a peer is added to the cleanup list.
|
// IncPending bumps the tracked-accounts gauge when a new account
|
||||||
|
// becomes eligible for ephemeral cleanup tracking.
|
||||||
func (m *EphemeralPeersMetrics) IncPending() {
|
func (m *EphemeralPeersMetrics) IncPending() {
|
||||||
if m == nil {
|
if m == nil {
|
||||||
return
|
return
|
||||||
@@ -69,8 +70,8 @@ func (m *EphemeralPeersMetrics) IncPending() {
|
|||||||
m.pending.Add(m.ctx, 1)
|
m.pending.Add(m.ctx, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddPending bumps the pending gauge by n — used at startup when the
|
// AddPending bumps the tracked-accounts gauge by n — used at startup
|
||||||
// initial set of ephemeral peers is loaded from the store.
|
// when the catch-up query seeds the tracker.
|
||||||
func (m *EphemeralPeersMetrics) AddPending(n int64) {
|
func (m *EphemeralPeersMetrics) AddPending(n int64) {
|
||||||
if m == nil || n <= 0 {
|
if m == nil || n <= 0 {
|
||||||
return
|
return
|
||||||
@@ -78,9 +79,8 @@ func (m *EphemeralPeersMetrics) AddPending(n int64) {
|
|||||||
m.pending.Add(m.ctx, n)
|
m.pending.Add(m.ctx, n)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DecPending decreases the pending gauge — used both when a peer reconnects
|
// DecPending decreases the tracked-accounts gauge when an account is
|
||||||
// before its deadline (removed from the list) and when a cleanup tick
|
// dropped from the tracker (no more disconnects to chase).
|
||||||
// actually deletes it.
|
|
||||||
func (m *EphemeralPeersMetrics) DecPending(n int64) {
|
func (m *EphemeralPeersMetrics) DecPending(n int64) {
|
||||||
if m == nil || n <= 0 {
|
if m == nil || n <= 0 {
|
||||||
return
|
return
|
||||||
|
|||||||
Reference in New Issue
Block a user