Compare commits

..

4 Commits

Author SHA1 Message Date
Zoltán Papp
48b1ab010e make ExtendAuthSession JWT-retry backoff cancellable
Skip the retry log and 200ms wait on the final attempt, and replace the
uncancellable time.Sleep with a select on time.After/ctx.Done so an
upstream cancellation aborts the wait instead of running it to
completion.
2026-05-19 17:35:56 +02:00
Zoltán Papp
cfeb15fe2a add UserExtendedPeerSession activity event
ExtendAuthSession previously reused UserLoggedInPeer for its audit
record, which conflated two distinct user actions: a full interactive
SSO login (tunnel re-established, network map resync) versus an
in-place deadline refresh (tunnel untouched). Auditors reading the log
couldn't tell which one happened, and downstream dashboards/alerts on
"login" volume were polluted by routine extends.

Adds a dedicated UserExtendedPeerSession Activity (code 125,
"user.peer.session.extend") and switches ExtendPeerSession over to it.
The peer-extend audit trail is now distinguishable from interactive
logins.
2026-05-19 14:55:22 +02:00
Zoltán Papp
7de3242a27 encode SessionExpiresAt as 3-state on the wire
Previously the `sessionExpiresAt` field on LoginResponse, SyncResponse
and ExtendAuthSessionResponse was 2-state: a valid timestamp meant
"new deadline", and nil meant "clear". That conflated two distinct
meanings — "no info in this snapshot" vs "expiry is explicitly off /
peer is not SSO-tracked" — so a Sync push that legitimately couldn't
compute the deadline (settings lookup failed) would silently clear the
client's anchor and lose the warning window.

Three states now, encoded on the same field number (no .proto schema
churn — only comments and the server-side encoder change):

  - nil pointer (field absent) → "no info"; client preserves anchor
  - &Timestamp{} (seconds=0, nanos=0) → explicit "disabled / not SSO"
    sentinel; client clears
  - valid timestamp → new absolute UTC deadline

A new encodeSessionExpiresAt helper centralises the zero/non-zero
encoding and is shared by the Sync, Login and ExtendAuthSession
builders. The Sync builder still emits nil when settings are missing.
Login and ExtendAuthSession always carry an authoritative value.

The matching client-side decoder lands on feature/session-extend.
2026-05-19 14:46:27 +02:00
Zoltán Papp
6dcba89a46 add SSO session extend flow (management)
Adds the management-server half of the SSO session-extension feature:

- New ExtendAuthSession gRPC RPC that refreshes a peer's session expiry
  using a fresh JWT, validated through the same pipeline as Login but
  without tearing down the tunnel or redoing the NetworkMap sync.
- Per-peer SessionExpiresAt timestamp on every LoginResponse and
  SyncResponse so connected clients learn the deadline on the existing
  long-lived stream, and admin-side changes (toggling expiration,
  changing the expiration window) reach every peer within seconds.
- SessionExpiresAt(...) helper on Peer that derives the absolute UTC
  deadline from LastLogin + the account-level PeerLoginExpiration
  setting, returning zero when the peer is not SSO-tracked or expiration
  is disabled.

The matching client-side consumer of these fields lands separately.
2026-05-18 23:37:02 +02:00
22 changed files with 1941 additions and 1727 deletions

View File

@@ -2,7 +2,6 @@ package manager
import (
"context"
"math/rand"
"sync"
"time"
@@ -12,76 +11,44 @@ import (
"github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral"
"github.com/netbirdio/netbird/management/server/activity"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/store"
)
const (
// cleanupWindow is the small grace period added on top of the
// staleness horizon before a sweep fires. It absorbs minor clock
// skew between the management server and the database and avoids
// firing a sweep right at the boundary where last_seen could still
// be one tick under the threshold.
// cleanupWindow is the time window to wait after nearest peer deadline to start the cleanup procedure.
cleanupWindow = 1 * time.Minute
// initialLoadMinDelay and initialLoadMaxDelay bracket the random
// delay applied before the post-restart catch-up query runs. Spread
// across replicas this prevents a thundering herd of catch-up
// queries hitting the database simultaneously after a deploy.
initialLoadMinDelay = 8 * time.Minute
initialLoadMaxDelay = 10 * time.Minute
)
var (
timeNow = time.Now
)
// accountEntry is the per-account state held by the cleanup tracker.
// We don't track which peers are pending — the sweep query gets the
// authoritative list straight from the database every time. We only
// need to know the latest disconnect we've observed for this account
// (so we can decide when it's safe to drop the entry) and the timer
// that will fire the next sweep.
type accountEntry struct {
lastDisconnectedAt time.Time
timer *time.Timer
type ephemeralPeer struct {
id string
accountID string
deadline time.Time
next *ephemeralPeer
}
// EphemeralManager tracks accounts that may have ephemeral peers in
// need of cleanup and runs a per-account sweep at the appropriate
// time. State is in-memory and account-scoped: a sweep deletes any
// ephemeral peer in the account that has been disconnected for at
// least lifeTime, then either drops the account from the tracker
// (when no recent disconnects have arrived) or re-arms the timer.
// todo: consider to remove peer from ephemeral list when the peer has been deleted via API. If we do not do it
// in worst case we will get invalid error message in this manager.
// EphemeralManager keep a list of ephemeral peers. After EphemeralLifeTime inactivity the peer will be deleted
// automatically. Inactivity means the peer disconnected from the Management server.
type EphemeralManager struct {
store store.Store
peersManager peers.Manager
accountsLock sync.Mutex
accounts map[string]*accountEntry
// initialLoadTimer is the one-shot timer used to defer the
// post-restart catch-up query; held so Stop() can cancel it.
initialLoadTimer *time.Timer
// stopped is flipped by Stop() so any timer that fires after
// teardown becomes a no-op instead of touching a half-dismantled
// store.
stopped bool
headPeer *ephemeralPeer
tailPeer *ephemeralPeer
peersLock sync.Mutex
timer *time.Timer
lifeTime time.Duration
cleanupWindow time.Duration
// initialLoadDelay returns the wall-clock delay to wait before
// running the post-restart catch-up query. Pluggable so tests can
// fire the load immediately.
initialLoadDelay func() time.Duration
// bgCtx is the long-lived context captured at LoadInitialPeers
// time. Timer-driven sweeps use it because they fire long after
// the original gRPC handler ctx that produced an OnPeerDisconnected
// call has been cancelled.
bgCtx context.Context
// metrics is nil-safe; methods on telemetry.EphemeralPeersMetrics
// no-op when the receiver is nil so deployments without an app
// metrics provider work unchanged.
@@ -91,265 +58,228 @@ type EphemeralManager struct {
// NewEphemeralManager instantiate new EphemeralManager
func NewEphemeralManager(store store.Store, peersManager peers.Manager) *EphemeralManager {
return &EphemeralManager{
store: store,
peersManager: peersManager,
accounts: make(map[string]*accountEntry),
lifeTime: ephemeral.EphemeralLifeTime,
cleanupWindow: cleanupWindow,
initialLoadDelay: defaultInitialLoadDelay,
store: store,
peersManager: peersManager,
lifeTime: ephemeral.EphemeralLifeTime,
cleanupWindow: cleanupWindow,
}
}
// SetMetrics attaches a metrics collector. Pass nil to detach.
// SetMetrics attaches a metrics collector. Safe to call once before
// LoadInitialPeers; later attachment is fine but earlier loads won't be
// reflected in the gauge. Pass nil to detach.
func (e *EphemeralManager) SetMetrics(m *telemetry.EphemeralPeersMetrics) {
e.accountsLock.Lock()
e.peersLock.Lock()
e.metrics = m
e.accountsLock.Unlock()
e.peersLock.Unlock()
}
// LoadInitialPeers schedules the post-restart catch-up query for a
// random moment 8-10 minutes from now. Returns immediately. The
// catch-up populates the per-account tracker from the database so any
// peers that disconnected before the restart still get cleaned up.
//
// The random delay is critical: without it, every management replica
// hitting the same Postgres instance after a deploy would issue the
// catch-up query simultaneously.
// LoadInitialPeers load from the database the ephemeral type of peers and schedule a cleanup procedure to the head
// of the linked list (to the most deprecated peer). At the end of cleanup it schedules the next cleanup to the new
// head.
func (e *EphemeralManager) LoadInitialPeers(ctx context.Context) {
e.accountsLock.Lock()
defer e.accountsLock.Unlock()
if e.stopped {
e.peersLock.Lock()
defer e.peersLock.Unlock()
e.loadEphemeralPeers(ctx)
if e.headPeer != nil {
e.timer = time.AfterFunc(e.lifeTime, func() {
e.cleanup(ctx)
})
}
}
// Stop timer
func (e *EphemeralManager) Stop() {
e.peersLock.Lock()
defer e.peersLock.Unlock()
if e.timer != nil {
e.timer.Stop()
}
}
// OnPeerConnected remove the peer from the linked list of ephemeral peers. Because it has been called when the peer
// is active the manager will not delete it while it is active.
func (e *EphemeralManager) OnPeerConnected(ctx context.Context, peer *nbpeer.Peer) {
if !peer.Ephemeral {
return
}
e.bgCtx = ctx
log.WithContext(ctx).Tracef("remove peer from ephemeral list: %s", peer.ID)
delay := e.initialLoadDelay()
log.WithContext(ctx).Infof("ephemeral peer initial load scheduled in %s", delay)
e.initialLoadTimer = time.AfterFunc(delay, func() {
e.loadInitialAccounts(e.bgCtx)
})
}
e.peersLock.Lock()
defer e.peersLock.Unlock()
// Stop cancels the deferred initial load and any per-account timers.
func (e *EphemeralManager) Stop() {
e.accountsLock.Lock()
defer e.accountsLock.Unlock()
e.stopped = true
if e.initialLoadTimer != nil {
e.initialLoadTimer.Stop()
e.initialLoadTimer = nil
if e.removePeer(peer.ID) {
e.metrics.DecPending(1)
}
for _, entry := range e.accounts {
if entry.timer != nil {
entry.timer.Stop()
}
// stop the unnecessary timer
if e.headPeer == nil && e.timer != nil {
e.timer.Stop()
e.timer = nil
}
e.accounts = make(map[string]*accountEntry)
}
// OnPeerConnected is a no-op in the account-scoped design. The sweep
// query filters out connected peers at the database level, so we don't
// need an explicit "remove from list" signal when a peer reconnects.
// Kept on the interface to preserve the existing call sites.
func (e *EphemeralManager) OnPeerConnected(_ context.Context, _ *nbpeer.Peer) {
}
// OnPeerDisconnected registers a disconnect for the peer's account and
// arms a sweep if one isn't already scheduled. Non-ephemeral peers are
// ignored.
// OnPeerDisconnected add the peer to the linked list of ephemeral peers. Because of the peer
// is inactive it will be deleted after the EphemeralLifeTime period.
func (e *EphemeralManager) OnPeerDisconnected(ctx context.Context, peer *nbpeer.Peer) {
if !peer.Ephemeral {
return
}
now := timeNow()
log.WithContext(ctx).Tracef("add peer to ephemeral list: %s", peer.ID)
e.accountsLock.Lock()
defer e.accountsLock.Unlock()
if e.stopped {
e.peersLock.Lock()
defer e.peersLock.Unlock()
if e.isPeerOnList(peer.ID) {
return
}
entry, existed := e.accounts[peer.AccountID]
if !existed {
entry = &accountEntry{}
e.accounts[peer.AccountID] = entry
e.metrics.IncPending()
}
entry.lastDisconnectedAt = now
if entry.timer == nil {
delay := e.lifeTime + e.cleanupWindow
log.WithContext(ctx).Tracef("ephemeral: scheduling sweep for account %s in %s", peer.AccountID, delay)
accountID := peer.AccountID
entry.timer = time.AfterFunc(delay, func() {
e.sweep(e.bgCtxOrFallback(ctx), accountID)
e.addPeer(peer.AccountID, peer.ID, e.newDeadLine())
e.metrics.IncPending()
if e.timer == nil {
delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow
if delay < 0 {
delay = 0
}
e.timer = time.AfterFunc(delay, func() {
e.cleanup(ctx)
})
}
}
// bgCtxOrFallback returns the long-lived background context captured at
// LoadInitialPeers time, falling back to the supplied ctx when the
// manager hasn't been started through LoadInitialPeers (e.g. in tests
// that drive the manager directly). Must be called with the lock held
// or before the timer is armed.
func (e *EphemeralManager) bgCtxOrFallback(ctx context.Context) context.Context {
if e.bgCtx != nil {
return e.bgCtx
func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) {
peers, err := e.store.GetAllEphemeralPeers(ctx, store.LockingStrengthNone)
if err != nil {
log.WithContext(ctx).Debugf("failed to load ephemeral peers: %s", err)
return
}
return ctx
t := e.newDeadLine()
for _, p := range peers {
e.addPeer(p.AccountID, p.ID, t)
}
e.metrics.AddPending(int64(len(peers)))
log.WithContext(ctx).Debugf("loaded ephemeral peer(s): %d", len(peers))
}
// loadInitialAccounts runs the post-restart catch-up query and seeds
// the tracker with one entry per account that has at least one
// disconnected ephemeral peer.
func (e *EphemeralManager) loadInitialAccounts(ctx context.Context) {
accounts, err := e.store.GetEphemeralAccountsLastDisconnect(ctx)
if err != nil {
log.WithContext(ctx).Errorf("failed to load ephemeral accounts on startup: %v", err)
return
}
func (e *EphemeralManager) cleanup(ctx context.Context) {
log.Tracef("on ephemeral cleanup")
deletePeers := make(map[string]*ephemeralPeer)
e.peersLock.Lock()
now := timeNow()
added := 0
for p := e.headPeer; p != nil; p = p.next {
if now.Before(p.deadline) {
break
}
e.accountsLock.Lock()
defer e.accountsLock.Unlock()
if e.stopped {
return
deletePeers[p.id] = p
e.headPeer = p.next
if p.next == nil {
e.tailPeer = nil
}
}
for accountID, lastDisc := range accounts {
// If we already learned about this account via an
// OnPeerDisconnected that arrived during the random delay
// window, prefer the live timestamp.
if _, alreadyTracked := e.accounts[accountID]; alreadyTracked {
if e.headPeer != nil {
delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow
if delay < 0 {
delay = 0
}
e.timer = time.AfterFunc(delay, func() {
e.cleanup(ctx)
})
} else {
e.timer = nil
}
e.peersLock.Unlock()
// Drop the gauge by the number of entries we just took off the list,
// regardless of whether the subsequent DeletePeers call succeeds. The
// list invariant is what the gauge tracks; failed delete batches are
// counted separately via CountCleanupError so we can still see them.
if len(deletePeers) > 0 {
e.metrics.CountCleanupRun()
e.metrics.DecPending(int64(len(deletePeers)))
}
peerIDsPerAccount := make(map[string][]string)
for id, p := range deletePeers {
peerIDsPerAccount[p.accountID] = append(peerIDsPerAccount[p.accountID], id)
}
for accountID, peerIDs := range peerIDsPerAccount {
log.WithContext(ctx).Tracef("cleanup: deleting %d ephemeral peers for account %s", len(peerIDs), accountID)
err := e.peersManager.DeletePeers(ctx, accountID, peerIDs, activity.SystemInitiator, true)
if err != nil {
log.WithContext(ctx).Errorf("failed to delete ephemeral peers: %s", err)
e.metrics.CountCleanupError()
continue
}
e.metrics.CountPeersCleaned(int64(len(peerIDs)))
}
}
entry := &accountEntry{lastDisconnectedAt: lastDisc}
horizon := lastDisc.Add(e.lifeTime)
func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline time.Time) {
ep := &ephemeralPeer{
id: peerID,
accountID: accountID,
deadline: deadline,
}
var delay time.Duration
if horizon.After(now) {
delay = horizon.Sub(now) + e.cleanupWindow
} else {
// Already past the staleness window — sweep right away
// (one cleanupWindow later, to keep startup load smooth
// when many accounts qualify at once).
delay = e.cleanupWindow
if e.headPeer == nil {
e.headPeer = ep
}
if e.tailPeer != nil {
e.tailPeer.next = ep
}
e.tailPeer = ep
}
// removePeer drops the entry from the linked list. Returns true if a
// matching entry was found and removed so callers can keep the pending
// metric gauge in sync.
func (e *EphemeralManager) removePeer(id string) bool {
if e.headPeer == nil {
return false
}
if e.headPeer.id == id {
e.headPeer = e.headPeer.next
if e.tailPeer.id == id {
e.tailPeer = nil
}
idForClosure := accountID
entry.timer = time.AfterFunc(delay, func() {
e.sweep(ctx, idForClosure)
})
e.accounts[accountID] = entry
added++
return true
}
e.metrics.AddPending(int64(added))
log.WithContext(ctx).Debugf("ephemeral: loaded %d account(s) for cleanup tracking", added)
}
// sweep runs the cleanup pass for a single account. It queries the
// database for disconnected ephemeral peers that have crossed the
// staleness window, deletes them via peers.Manager, and then decides
// whether to drop the account from the tracker or re-arm the timer.
func (e *EphemeralManager) sweep(ctx context.Context, accountID string) {
now := timeNow()
e.accountsLock.Lock()
entry, ok := e.accounts[accountID]
if !ok || e.stopped {
e.accountsLock.Unlock()
return
}
lastDisc := entry.lastDisconnectedAt
entry.timer = nil
e.accountsLock.Unlock()
threshold := now.Add(-e.lifeTime)
stalePeerIDs, err := e.store.GetStaleEphemeralPeerIDsForAccount(ctx, accountID, threshold)
if err != nil {
log.WithContext(ctx).Errorf("ephemeral: failed to query stale peers for account %s: %v", accountID, err)
e.metrics.CountCleanupError()
e.rearm(ctx, accountID, e.cleanupWindow)
return
}
if len(stalePeerIDs) > 0 {
log.WithContext(ctx).Tracef("ephemeral: deleting %d peer(s) for account %s", len(stalePeerIDs), accountID)
if err := e.peersManager.DeletePeers(ctx, accountID, stalePeerIDs, activity.SystemInitiator, true); err != nil {
log.WithContext(ctx).Errorf("ephemeral: failed to delete peers for account %s: %v", accountID, err)
e.metrics.CountCleanupError()
e.rearm(ctx, accountID, e.cleanupWindow)
return
for p := e.headPeer; p.next != nil; p = p.next {
if p.next.id == id {
// if we remove the last element from the chain then set the last-1 as tail
if e.tailPeer.id == id {
e.tailPeer = p
}
p.next = p.next.next
return true
}
e.metrics.CountCleanupRun()
e.metrics.CountPeersCleaned(int64(len(stalePeerIDs)))
}
e.accountsLock.Lock()
defer e.accountsLock.Unlock()
if e.stopped {
return
}
entry, ok = e.accounts[accountID]
if !ok {
return
}
// Drop rule: if every disconnect we've observed has now crossed
// the staleness window, the sweep we just ran saw everything that
// could possibly need cleaning. Dropping is safe — a future
// disconnect will recreate the entry. The check uses the latest
// lastDisc, which may have advanced (concurrently with the sweep
// itself) due to a new OnPeerDisconnected, in which case we
// correctly re-arm.
horizon := entry.lastDisconnectedAt.Add(e.lifeTime)
if !horizon.After(now) {
delete(e.accounts, accountID)
e.metrics.DecPending(1)
log.WithContext(ctx).Tracef("ephemeral: dropping account %s (lastDisc=%s, horizon=%s, now=%s)",
accountID, lastDisc, horizon, now)
return
}
delay := horizon.Sub(now) + e.cleanupWindow
idForClosure := accountID
entry.timer = time.AfterFunc(delay, func() {
e.sweep(ctx, idForClosure)
})
return false
}
// rearm reschedules a sweep `delay` from now. Used after a recoverable
// error in the sweep path so the account doesn't get stuck.
func (e *EphemeralManager) rearm(ctx context.Context, accountID string, delay time.Duration) {
e.accountsLock.Lock()
defer e.accountsLock.Unlock()
if e.stopped {
return
func (e *EphemeralManager) isPeerOnList(id string) bool {
for p := e.headPeer; p != nil; p = p.next {
if p.id == id {
return true
}
}
entry, ok := e.accounts[accountID]
if !ok {
return
}
idForClosure := accountID
entry.timer = time.AfterFunc(delay, func() {
e.sweep(ctx, idForClosure)
})
return false
}
// defaultInitialLoadDelay returns a random duration in
// [initialLoadMinDelay, initialLoadMaxDelay). Process-wide
// math/rand is acceptable here — the delay is purely a smoothing
// jitter, not a security primitive.
func defaultInitialLoadDelay() time.Duration {
span := int64(initialLoadMaxDelay - initialLoadMinDelay)
if span <= 0 {
return initialLoadMinDelay
}
return initialLoadMinDelay + time.Duration(rand.Int63n(span))
func (e *EphemeralManager) newDeadLine() time.Time {
return timeNow().Add(e.lifeTime)
}

View File

@@ -2,544 +2,299 @@ package manager
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/golang/mock/gomock"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/management/internals/modules/peers"
"github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral"
nbAccount "github.com/netbirdio/netbird/management/server/account"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/route"
)
// MockStore is a thin in-memory stand-in that implements only the two
// methods the EphemeralManager uses. It honors the account / ephemeral
// / connected / lastSeen attributes of each peer so the cleanup logic
// can be exercised end-to-end without bringing up sqlite or Postgres.
type MockStore struct {
store.Store
mu sync.Mutex
account *types.Account
}
func (s *MockStore) GetStaleEphemeralPeerIDsForAccount(_ context.Context, accountID string, olderThan time.Time) ([]string, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.account == nil || s.account.Id != accountID {
return nil, nil
}
var ids []string
for _, p := range s.account.Peers {
if !p.Ephemeral {
continue
}
if p.Status == nil || p.Status.Connected {
continue
}
if p.Status.LastSeen.Before(olderThan) {
ids = append(ids, p.ID)
func (s *MockStore) GetAllEphemeralPeers(_ context.Context, _ store.LockingStrength) ([]*nbpeer.Peer, error) {
var peers []*nbpeer.Peer
for _, v := range s.account.Peers {
if v.Ephemeral {
peers = append(peers, v)
}
}
return ids, nil
return peers, nil
}
func (s *MockStore) GetEphemeralAccountsLastDisconnect(_ context.Context) (map[string]time.Time, error) {
s.mu.Lock()
defer s.mu.Unlock()
out := map[string]time.Time{}
if s.account == nil {
return out, nil
}
var latest time.Time
hasAny := false
for _, p := range s.account.Peers {
if !p.Ephemeral || p.Status == nil || p.Status.Connected {
continue
}
if !hasAny || p.Status.LastSeen.After(latest) {
latest = p.Status.LastSeen
hasAny = true
}
}
if hasAny {
out[s.account.Id] = latest
}
return out, nil
type MockAccountManager struct {
mu sync.Mutex
nbAccount.Manager
store *MockStore
deletePeerCalls int
bufferUpdateCalls map[string]int
wg *sync.WaitGroup
}
// withFakeClock pins timeNow to a settable value for the duration of t.
// Returns a getter and a setter so subtests can advance virtual time.
func withFakeClock(t *testing.T, start time.Time) (get func() time.Time, set func(time.Time)) {
t.Helper()
var mu sync.Mutex
now := start
func (a *MockAccountManager) DeletePeer(_ context.Context, accountID, peerID, userID string) error {
a.mu.Lock()
defer a.mu.Unlock()
a.deletePeerCalls++
delete(a.store.account.Peers, peerID)
if a.wg != nil {
a.wg.Done()
}
return nil
}
func (a *MockAccountManager) GetDeletePeerCalls() int {
a.mu.Lock()
defer a.mu.Unlock()
return a.deletePeerCalls
}
func (a *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) {
a.mu.Lock()
defer a.mu.Unlock()
if a.bufferUpdateCalls == nil {
a.bufferUpdateCalls = make(map[string]int)
}
a.bufferUpdateCalls[accountID]++
}
func (a *MockAccountManager) GetBufferUpdateCalls(accountID string) int {
a.mu.Lock()
defer a.mu.Unlock()
if a.bufferUpdateCalls == nil {
return 0
}
return a.bufferUpdateCalls[accountID]
}
func (a *MockAccountManager) GetStore() store.Store {
return a.store
}
func TestNewManager(t *testing.T) {
t.Cleanup(func() {
timeNow = time.Now
})
startTime := time.Now()
timeNow = func() time.Time {
mu.Lock()
defer mu.Unlock()
return now
return startTime
}
t.Cleanup(func() { timeNow = time.Now })
return func() time.Time {
mu.Lock()
defer mu.Unlock()
return now
}, func(v time.Time) {
mu.Lock()
defer mu.Unlock()
now = v
}
}
// newManagerForTest builds a manager with short timers and no random
// initial-load delay so tests run instantly.
func newManagerForTest(t *testing.T, st store.Store, peersMgr peers.Manager) *EphemeralManager {
t.Helper()
mgr := NewEphemeralManager(st, peersMgr)
mgr.lifeTime = 100 * time.Millisecond
mgr.cleanupWindow = 10 * time.Millisecond
mgr.initialLoadDelay = func() time.Duration { return 0 }
t.Cleanup(mgr.Stop)
return mgr
}
// TestOnPeerDisconnected_RegistersAndSweeps drives the OnPeerDisconnected
// path with a fake clock: a single ephemeral peer disconnects, we
// advance past the staleness window, and the sweep deletes it.
func TestOnPeerDisconnected_RegistersAndSweeps(t *testing.T) {
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
getNow, setNow := withFakeClock(t, time.Now())
store := &MockStore{}
ctrl := gomock.NewController(t)
peersMgr := peers.NewMockManager(ctrl)
peersManager := peers.NewMockManager(ctrl)
var deletedMu sync.Mutex
var deleted []string
var deleteCalls atomic.Int32
peersMgr.EXPECT().
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
DoAndReturn(func(_ context.Context, accountID string, peerIDs []string, _ string, _ bool) error {
deleteCalls.Add(1)
mockStore.mu.Lock()
for _, id := range peerIDs {
delete(mockStore.account.Peers, id)
numberOfPeers := 5
numberOfEphemeralPeers := 3
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
// Expect DeletePeers to be called for ephemeral peers
peersManager.EXPECT().
DeletePeers(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), true).
DoAndReturn(func(ctx context.Context, accountID string, peerIDs []string, userID string, checkConnected bool) error {
for _, peerID := range peerIDs {
delete(store.account.Peers, peerID)
}
mockStore.mu.Unlock()
deletedMu.Lock()
deleted = append(deleted, peerIDs...)
deletedMu.Unlock()
return nil
}).AnyTimes()
}).
AnyTimes()
mgr := newManagerForTest(t, mockStore, peersMgr)
mgr := NewEphemeralManager(store, peersManager)
mgr.loadEphemeralPeers(context.Background())
startTime = startTime.Add(ephemeral.EphemeralLifeTime + 1)
mgr.cleanup(context.Background())
// One ephemeral peer that disconnected "now".
now := getNow()
p := &nbpeer.Peer{
ID: "p1",
AccountID: "acc-1",
Ephemeral: true,
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now},
if len(store.account.Peers) != numberOfPeers {
t.Errorf("failed to cleanup ephemeral peers, expected: %d, result: %d", numberOfPeers, len(store.account.Peers))
}
mockStore.account.Peers[p.ID] = p
mgr.OnPeerDisconnected(context.Background(), p)
// Advance past lifeTime + cleanupWindow so the timer-driven sweep fires.
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
require.Eventually(t, func() bool { return deleteCalls.Load() >= 1 }, 2*time.Second, 5*time.Millisecond,
"sweep should fire and delete the stale peer")
deletedMu.Lock()
deletedCopy := append([]string(nil), deleted...)
deletedMu.Unlock()
require.Equal(t, []string{"p1"}, deletedCopy, "only the one ephemeral peer should be deleted")
}
// TestOnPeerDisconnected_NonEphemeralIgnored: a non-ephemeral disconnect
// must not register the account or arm any timer.
func TestOnPeerDisconnected_NonEphemeralIgnored(t *testing.T) {
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
withFakeClock(t, time.Now())
ctrl := gomock.NewController(t)
peersMgr := peers.NewMockManager(ctrl)
// No DeletePeers expectation — must not be called.
mgr := newManagerForTest(t, mockStore, peersMgr)
mgr.OnPeerDisconnected(context.Background(), &nbpeer.Peer{
ID: "p1",
AccountID: "acc-1",
Ephemeral: false,
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: timeNow()},
func TestNewManagerPeerConnected(t *testing.T) {
t.Cleanup(func() {
timeNow = time.Now
})
startTime := time.Now()
timeNow = func() time.Time {
return startTime
}
mgr.accountsLock.Lock()
require.Empty(t, mgr.accounts, "non-ephemeral disconnect must not register an account")
mgr.accountsLock.Unlock()
store := &MockStore{}
ctrl := gomock.NewController(t)
peersManager := peers.NewMockManager(ctrl)
numberOfPeers := 5
numberOfEphemeralPeers := 3
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
// Expect DeletePeers to be called for ephemeral peers (except the connected one)
peersManager.EXPECT().
DeletePeers(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), true).
DoAndReturn(func(ctx context.Context, accountID string, peerIDs []string, userID string, checkConnected bool) error {
for _, peerID := range peerIDs {
delete(store.account.Peers, peerID)
}
return nil
}).
AnyTimes()
mgr := NewEphemeralManager(store, peersManager)
mgr.loadEphemeralPeers(context.Background())
mgr.OnPeerConnected(context.Background(), store.account.Peers["ephemeral_peer_0"])
startTime = startTime.Add(ephemeral.EphemeralLifeTime + 1)
mgr.cleanup(context.Background())
expected := numberOfPeers + 1
if len(store.account.Peers) != expected {
t.Errorf("failed to cleanup ephemeral peers, expected: %d, result: %d", expected, len(store.account.Peers))
}
}
// TestSweep_DropsAccountWhenIdle: after a sweep cleans the stale peers,
// if no more disconnects have arrived the account must be dropped from
// the in-memory tracker.
func TestSweep_DropsAccountWhenIdle(t *testing.T) {
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
getNow, setNow := withFakeClock(t, time.Now())
func TestNewManagerPeerDisconnected(t *testing.T) {
t.Cleanup(func() {
timeNow = time.Now
})
startTime := time.Now()
timeNow = func() time.Time {
return startTime
}
store := &MockStore{}
ctrl := gomock.NewController(t)
peersMgr := peers.NewMockManager(ctrl)
peersMgr.EXPECT().
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
mockStore.mu.Lock()
for _, id := range peerIDs {
delete(mockStore.account.Peers, id)
peersManager := peers.NewMockManager(ctrl)
numberOfPeers := 5
numberOfEphemeralPeers := 3
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
// Expect DeletePeers to be called for the one disconnected peer
peersManager.EXPECT().
DeletePeers(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), true).
DoAndReturn(func(ctx context.Context, accountID string, peerIDs []string, userID string, checkConnected bool) error {
for _, peerID := range peerIDs {
delete(store.account.Peers, peerID)
}
mockStore.mu.Unlock()
return nil
}).AnyTimes()
}).
AnyTimes()
mgr := newManagerForTest(t, mockStore, peersMgr)
mgr := NewEphemeralManager(store, peersManager)
mgr.loadEphemeralPeers(context.Background())
for _, v := range store.account.Peers {
mgr.OnPeerConnected(context.Background(), v)
now := getNow()
p := &nbpeer.Peer{ID: "p1", AccountID: "acc-1", Ephemeral: true,
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now}}
mockStore.account.Peers[p.ID] = p
mgr.OnPeerDisconnected(context.Background(), p)
}
mgr.OnPeerDisconnected(context.Background(), store.account.Peers["ephemeral_peer_0"])
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
startTime = startTime.Add(ephemeral.EphemeralLifeTime + 1)
mgr.cleanup(context.Background())
require.Eventually(t, func() bool {
mgr.accountsLock.Lock()
defer mgr.accountsLock.Unlock()
return len(mgr.accounts) == 0
}, 2*time.Second, 5*time.Millisecond, "account should be dropped after sweep with no new disconnects")
expected := numberOfPeers + numberOfEphemeralPeers - 1
if len(store.account.Peers) != expected {
t.Errorf("failed to cleanup ephemeral peers, expected: %d, result: %d", expected, len(store.account.Peers))
}
}
// TestSweep_ReArmsWhenNewDisconnectArrived: simulate the race where a
// fresh disconnect arrives just before the sweep fires. The sweep must
// observe the updated lastDisc and re-arm rather than drop.
func TestSweep_ReArmsWhenNewDisconnectArrived(t *testing.T) {
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
getNow, setNow := withFakeClock(t, time.Now())
func TestCleanupSchedulingBehaviorIsBatched(t *testing.T) {
const (
ephemeralPeers = 10
testLifeTime = 1 * time.Second
testCleanupWindow = 100 * time.Millisecond
)
t.Cleanup(func() {
timeNow = time.Now
})
startTime := time.Now()
timeNow = func() time.Time {
return startTime
}
mockStore := &MockStore{}
account := newAccountWithId(context.Background(), "account", "", "", false)
mockStore.account = account
wg := &sync.WaitGroup{}
wg.Add(ephemeralPeers)
mockAM := &MockAccountManager{
store: mockStore,
wg: wg,
}
ctrl := gomock.NewController(t)
peersMgr := peers.NewMockManager(ctrl)
peersMgr.EXPECT().
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
mockStore.mu.Lock()
for _, id := range peerIDs {
delete(mockStore.account.Peers, id)
peersManager := peers.NewMockManager(ctrl)
// Set up expectation that DeletePeers will be called once with all peer IDs
peersManager.EXPECT().
DeletePeers(gomock.Any(), account.Id, gomock.Any(), gomock.Any(), true).
DoAndReturn(func(ctx context.Context, accountID string, peerIDs []string, userID string, checkConnected bool) error {
// Simulate the actual deletion behavior
for _, peerID := range peerIDs {
err := mockAM.DeletePeer(ctx, accountID, peerID, userID)
if err != nil {
return err
}
}
mockStore.mu.Unlock()
mockAM.BufferUpdateAccountPeers(ctx, accountID, types.UpdateReason{})
return nil
}).AnyTimes()
}).
Times(1)
mgr := newManagerForTest(t, mockStore, peersMgr)
mgr := NewEphemeralManager(mockStore, peersManager)
mgr.lifeTime = testLifeTime
mgr.cleanupWindow = testCleanupWindow
now := getNow()
p1 := &nbpeer.Peer{ID: "p1", AccountID: "acc-1", Ephemeral: true,
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now}}
mockStore.account.Peers[p1.ID] = p1
mgr.OnPeerDisconnected(context.Background(), p1)
// Advance most of the way toward the first sweep, then introduce
// a fresh disconnect that resets lastDisc.
setNow(now.Add(mgr.lifeTime - 10*time.Millisecond))
p2 := &nbpeer.Peer{ID: "p2", AccountID: "acc-1", Ephemeral: true,
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: getNow()}}
mockStore.account.Peers[p2.ID] = p2
mgr.OnPeerDisconnected(context.Background(), p2)
// Push past p1's staleness so the first sweep runs and cleans p1
// but observes p2 already on the account entry. It must re-arm.
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
require.Eventually(t, func() bool {
mockStore.mu.Lock()
defer mockStore.mu.Unlock()
_, gone := mockStore.account.Peers["p1"]
return !gone
}, 2*time.Second, 5*time.Millisecond, "p1 should be cleaned at the first sweep")
// The account should still be tracked because p2 is younger than lifeTime
// from the sweep's vantage point at this moment.
mgr.accountsLock.Lock()
_, stillTracked := mgr.accounts["acc-1"]
mgr.accountsLock.Unlock()
require.True(t, stillTracked, "account should remain tracked because p2's disconnect kept it active")
// Push past p2's staleness; second sweep cleans p2 and drops the account.
setNow(getNow().Add(mgr.lifeTime + 5*mgr.cleanupWindow))
require.Eventually(t, func() bool {
mgr.accountsLock.Lock()
defer mgr.accountsLock.Unlock()
return len(mgr.accounts) == 0
}, 2*time.Second, 5*time.Millisecond, "account should drop after the final sweep")
}
// TestSweep_BatchesPeersPerAccount: many ephemeral peers disconnect on
// the same account; a single sweep must delete them all in one
// DeletePeers call.
func TestSweep_BatchesPeersPerAccount(t *testing.T) {
const ephemeralPeers = 8
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
getNow, setNow := withFakeClock(t, time.Now())
ctrl := gomock.NewController(t)
peersMgr := peers.NewMockManager(ctrl)
deleteBatches := make(chan []string, 4)
peersMgr.EXPECT().
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
cp := append([]string(nil), peerIDs...)
mockStore.mu.Lock()
for _, id := range peerIDs {
delete(mockStore.account.Peers, id)
}
mockStore.mu.Unlock()
deleteBatches <- cp
return nil
}).Times(1)
mgr := newManagerForTest(t, mockStore, peersMgr)
now := getNow()
for i := 0; i < ephemeralPeers; i++ {
id := fmt.Sprintf("p-%d", i)
// Stagger by a fraction of cleanupWindow so they all fall on
// the same sweep tick.
when := now.Add(time.Duration(i) * time.Millisecond)
p := &nbpeer.Peer{ID: id, AccountID: "acc-1", Ephemeral: true,
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: when}}
mockStore.account.Peers[id] = p
// Add peers and disconnect them at slightly different times (within cleanup window)
for i := range ephemeralPeers {
p := &nbpeer.Peer{ID: fmt.Sprintf("peer-%d", i), AccountID: account.Id, Ephemeral: true}
mockStore.account.Peers[p.ID] = p
mgr.OnPeerDisconnected(context.Background(), p)
startTime = startTime.Add(testCleanupWindow / (ephemeralPeers * 2))
}
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
// Advance time past the lifetime to trigger cleanup
startTime = startTime.Add(testLifeTime + testCleanupWindow)
select {
case batch := <-deleteBatches:
require.Len(t, batch, ephemeralPeers, "all peers should be deleted in a single batch")
case <-time.After(2 * time.Second):
t.Fatal("expected one batched DeletePeers call")
}
// Wait for all deletions to complete
wg.Wait()
assert.Len(t, mockStore.account.Peers, 0, "all ephemeral peers should be cleaned up after the lifetime")
assert.Equal(t, 1, mockAM.GetBufferUpdateCalls(account.Id), "buffer update should be called once")
assert.Equal(t, ephemeralPeers, mockAM.GetDeletePeerCalls(), "should have deleted all peers")
}
// TestLoadInitialAccounts_SeedsFromStore exercises the post-restart
// catch-up path: pre-populate the store, point the manager at it, and
// confirm both already-stale and not-yet-stale peers get cleaned at
// their proper times.
func TestLoadInitialAccounts_SeedsFromStore(t *testing.T) {
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
getNow, setNow := withFakeClock(t, time.Now())
func seedPeers(store *MockStore, numberOfPeers int, numberOfEphemeralPeers int) {
store.account = newAccountWithId(context.Background(), "my account", "", "", false)
now := getNow()
// p-stale: already past the staleness window when load runs.
mockStore.account.Peers["p-stale"] = &nbpeer.Peer{
ID: "p-stale", AccountID: "acc-1", Ephemeral: true,
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now.Add(-time.Hour)},
}
// p-fresh: disconnected but not yet stale.
mockStore.account.Peers["p-fresh"] = &nbpeer.Peer{
ID: "p-fresh", AccountID: "acc-1", Ephemeral: true,
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: now},
for i := 0; i < numberOfPeers; i++ {
peerId := fmt.Sprintf("peer_%d", i)
p := &nbpeer.Peer{
ID: peerId,
Ephemeral: false,
}
store.account.Peers[p.ID] = p
}
ctrl := gomock.NewController(t)
peersMgr := peers.NewMockManager(ctrl)
peersMgr.EXPECT().
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
mockStore.mu.Lock()
for _, id := range peerIDs {
delete(mockStore.account.Peers, id)
}
mockStore.mu.Unlock()
return nil
}).AnyTimes()
mgr := newManagerForTest(t, mockStore, peersMgr)
// Drive loadInitialAccounts directly with the fake-clock-aware now.
mgr.loadInitialAccounts(context.Background())
// First sweep should fire shortly (cleanupWindow) for the stale peer.
setNow(now.Add(5 * mgr.cleanupWindow))
require.Eventually(t, func() bool {
mockStore.mu.Lock()
defer mockStore.mu.Unlock()
_, gone := mockStore.account.Peers["p-stale"]
return !gone
}, 2*time.Second, 5*time.Millisecond, "p-stale should be deleted on the first sweep")
// p-fresh is not yet stale; advance past its window.
setNow(now.Add(mgr.lifeTime + 5*mgr.cleanupWindow))
require.Eventually(t, func() bool {
mockStore.mu.Lock()
defer mockStore.mu.Unlock()
_, gone := mockStore.account.Peers["p-fresh"]
return !gone
}, 2*time.Second, 5*time.Millisecond, "p-fresh should be deleted once it crosses the staleness window")
}
// TestStop_CancelsPendingWork verifies that Stop() cancels both the
// deferred initial load and per-account sweep timers and that
// subsequent OnPeerDisconnected calls are ignored.
func TestStop_CancelsPendingWork(t *testing.T) {
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
withFakeClock(t, time.Now())
ctrl := gomock.NewController(t)
peersMgr := peers.NewMockManager(ctrl)
// DeletePeers must NOT be called after Stop.
mgr := NewEphemeralManager(mockStore, peersMgr)
mgr.lifeTime = 100 * time.Millisecond
mgr.cleanupWindow = 10 * time.Millisecond
// Use a long delay so the initial-load timer is still pending.
mgr.initialLoadDelay = func() time.Duration { return time.Hour }
mgr.LoadInitialPeers(context.Background())
mgr.OnPeerDisconnected(context.Background(), &nbpeer.Peer{
ID: "p1", AccountID: "acc-1", Ephemeral: true,
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: timeNow()},
})
mgr.accountsLock.Lock()
require.NotNil(t, mgr.initialLoadTimer, "initial-load timer should be armed")
require.Len(t, mgr.accounts, 1, "account should be tracked after disconnect")
mgr.accountsLock.Unlock()
mgr.Stop()
mgr.accountsLock.Lock()
require.Empty(t, mgr.accounts, "Stop should clear tracked accounts")
require.True(t, mgr.stopped, "stopped flag must be set")
mgr.accountsLock.Unlock()
// Post-stop disconnect must be ignored.
mgr.OnPeerDisconnected(context.Background(), &nbpeer.Peer{
ID: "p2", AccountID: "acc-1", Ephemeral: true,
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: timeNow()},
})
mgr.accountsLock.Lock()
require.Empty(t, mgr.accounts, "disconnects after Stop must be ignored")
mgr.accountsLock.Unlock()
}
// TestOnPeerConnected_IsNoop: the OnPeerConnected hook is preserved on
// the interface but does nothing in the per-account model — the sweep
// query filters connected peers at the DB level.
func TestOnPeerConnected_IsNoop(t *testing.T) {
mockStore := &MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)}
withFakeClock(t, time.Now())
ctrl := gomock.NewController(t)
peersMgr := peers.NewMockManager(ctrl)
mgr := newManagerForTest(t, mockStore, peersMgr)
mgr.OnPeerDisconnected(context.Background(), &nbpeer.Peer{
ID: "p1", AccountID: "acc-1", Ephemeral: true,
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: timeNow()},
})
mgr.accountsLock.Lock()
require.Len(t, mgr.accounts, 1, "disconnect should track the account")
mgr.accountsLock.Unlock()
mgr.OnPeerConnected(context.Background(), &nbpeer.Peer{ID: "p1", AccountID: "acc-1", Ephemeral: true})
mgr.accountsLock.Lock()
require.Len(t, mgr.accounts, 1, "OnPeerConnected must be a no-op")
mgr.accountsLock.Unlock()
}
// TestSweep_StoreErrorReArms: if the stale-peer query fails, the
// account must remain tracked and a follow-up sweep gets scheduled.
func TestSweep_StoreErrorReArms(t *testing.T) {
mockStore := &erroringStore{
MockStore: MockStore{account: newAccountWithId(context.Background(), "acc-1", "", "", false)},
}
getNow, setNow := withFakeClock(t, time.Now())
ctrl := gomock.NewController(t)
peersMgr := peers.NewMockManager(ctrl)
mgr := newManagerForTest(t, mockStore, peersMgr)
p := &nbpeer.Peer{ID: "p1", AccountID: "acc-1", Ephemeral: true,
Status: &nbpeer.PeerStatus{Connected: false, LastSeen: getNow()}}
mockStore.account.Peers[p.ID] = p
mgr.OnPeerDisconnected(context.Background(), p)
mockStore.fail.Store(true)
setNow(getNow().Add(mgr.lifeTime + 5*mgr.cleanupWindow))
// Wait until the failing sweep has run at least once.
require.Eventually(t, func() bool { return mockStore.failedCalls.Load() >= 1 },
2*time.Second, 5*time.Millisecond, "expected at least one failing sweep")
mgr.accountsLock.Lock()
_, stillTracked := mgr.accounts["acc-1"]
mgr.accountsLock.Unlock()
require.True(t, stillTracked, "account must remain tracked after a sweep error")
// Recover and ensure the rearmed sweep cleans up.
peersMgr.EXPECT().
DeletePeers(gomock.Any(), "acc-1", gomock.Any(), gomock.Any(), true).
DoAndReturn(func(_ context.Context, _ string, peerIDs []string, _ string, _ bool) error {
mockStore.mu.Lock()
for _, id := range peerIDs {
delete(mockStore.account.Peers, id)
}
mockStore.mu.Unlock()
return nil
}).AnyTimes()
mockStore.fail.Store(false)
require.Eventually(t, func() bool {
mockStore.mu.Lock()
defer mockStore.mu.Unlock()
_, gone := mockStore.account.Peers["p1"]
return !gone
}, 2*time.Second, 5*time.Millisecond, "rearmed sweep should clean up after the store recovers")
}
// erroringStore is a MockStore that can be flipped into a failing mode
// to exercise the sweep's error-rearm path.
type erroringStore struct {
MockStore
fail atomic.Bool
failedCalls atomic.Int32
}
func (s *erroringStore) GetStaleEphemeralPeerIDsForAccount(ctx context.Context, accountID string, olderThan time.Time) ([]string, error) {
if s.fail.Load() {
s.failedCalls.Add(1)
return nil, errors.New("synthetic store error")
}
return s.MockStore.GetStaleEphemeralPeerIDsForAccount(ctx, accountID, olderThan)
}
// TestDefaultInitialLoadDelay confirms the jitter falls inside the
// documented [8m, 10m) range — sanity check for the production timer.
func TestDefaultInitialLoadDelay(t *testing.T) {
for i := 0; i < 1000; i++ {
d := defaultInitialLoadDelay()
assert.GreaterOrEqual(t, d, initialLoadMinDelay)
assert.Less(t, d, initialLoadMaxDelay)
for i := 0; i < numberOfEphemeralPeers; i++ {
peerId := fmt.Sprintf("ephemeral_peer_%d", i)
p := &nbpeer.Peer{
ID: peerId,
Ephemeral: true,
}
store.account.Peers[p.ID] = p
}
}
@@ -596,7 +351,3 @@ func newAccountWithId(ctx context.Context, accountID, userID, domain string, dis
}
return acc
}
// silence the import "github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral"
// (still needed indirectly for ephemeral.EphemeralLifeTime in production paths).
var _ = ephemeral.EphemeralLifeTime

View File

@@ -6,9 +6,11 @@ import (
"net/netip"
"net/url"
"strings"
"time"
log "github.com/sirupsen/logrus"
goproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
integrationsConfig "github.com/netbirdio/management-integrations/integrations/config"
@@ -185,9 +187,38 @@ func ToSyncResponse(ctx context.Context, config *nbconfig.Config, httpConfig *nb
response.NetworkMap.SshAuth = &proto.SSHAuth{AuthorizedUsers: hashedUsers, MachineUsers: machineUsers, UserIDClaim: userIDClaim}
}
// settings == nil → field stays nil → "no info in this snapshot", client
// preserves the deadline it already had. settings non-nil → emit either a
// valid deadline or the explicit-zero "disabled" sentinel via
// encodeSessionExpiresAt.
if settings != nil {
response.SessionExpiresAt = encodeSessionExpiresAt(
peer.SessionExpiresAt(settings.PeerLoginExpirationEnabled, settings.PeerLoginExpiration),
)
}
return response
}
// encodeSessionExpiresAt encodes a server-side deadline into the 3-state wire
// representation used on LoginResponse, SyncResponse and
// ExtendAuthSessionResponse. See the proto comments on those messages.
//
// - deadline.IsZero() → returns &Timestamp{} (seconds=0, nanos=0): the
// "expiry disabled or peer is not SSO-tracked" sentinel; the client clears
// its anchor.
// - deadline non-zero → returns timestamppb.New(deadline): the new absolute
// UTC deadline.
//
// Returning nil ("no info, preserve client's anchor") is the caller's job —
// only meaningful on Sync builds where settings were not resolved.
func encodeSessionExpiresAt(deadline time.Time) *timestamppb.Timestamp {
if deadline.IsZero() {
return &timestamppb.Timestamp{}
}
return timestamppb.New(deadline)
}
func buildAuthorizedUsersProto(ctx context.Context, authorizedUsers map[string]map[string]struct{}) ([][]byte, map[string]*proto.MachineUserIndexes) {
userIDToIndex := make(map[string]uint32)
var hashedUsers [][]byte

View File

@@ -5,6 +5,7 @@ import (
"net/netip"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
@@ -200,3 +201,29 @@ func TestBuildJWTConfig_Audiences(t *testing.T) {
})
}
}
// TestEncodeSessionExpiresAt pins the wire encoding the client's
// applySessionDeadline depends on:
//
// - zero deadline → &Timestamp{} (seconds=0, nanos=0): the explicit
// "expiry disabled or peer is not SSO-tracked" sentinel.
// - non-zero → timestamppb.New(deadline): the absolute UTC deadline.
//
// The third state (nil pointer = "no info in this snapshot") is the caller's
// responsibility on the Sync path when settings could not be resolved; the
// helper itself never returns nil.
func TestEncodeSessionExpiresAt(t *testing.T) {
t.Run("zero deadline encodes as explicit-zero sentinel", func(t *testing.T) {
got := encodeSessionExpiresAt(time.Time{})
assert.NotNil(t, got, "must not return nil; nil means 'no info', not 'disabled'")
assert.Equal(t, int64(0), got.GetSeconds())
assert.Equal(t, int32(0), got.GetNanos())
})
t.Run("non-zero deadline round-trips", func(t *testing.T) {
deadline := time.Date(2030, 1, 2, 3, 4, 5, 0, time.UTC)
got := encodeSessionExpiresAt(deadline)
assert.NotNil(t, got)
assert.True(t, got.AsTime().Equal(deadline))
})
}

View File

@@ -821,6 +821,80 @@ func (s *Server) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto
}, nil
}
// ExtendAuthSession refreshes the peer's SSO session expiry deadline using a
// fresh JWT. The same JWT validation pipeline as Login is used. The tunnel
// stays up; no network map sync is performed. The new deadline is returned
// in ExtendAuthSessionResponse.SessionExpiresAt.
func (s *Server) ExtendAuthSession(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
extendReq := &proto.ExtendAuthSessionRequest{}
peerKey, err := s.parseRequest(ctx, req, extendReq)
if err != nil {
return nil, err
}
//nolint
ctx = context.WithValue(ctx, nbContext.PeerIDKey, peerKey.String())
if accountID, accErr := s.accountManager.GetAccountIDForPeerKey(ctx, peerKey.String()); accErr == nil {
//nolint
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
}
jwt := extendReq.GetJwtToken()
if jwt == "" {
return nil, status.Errorf(codes.InvalidArgument, "jwt token is required")
}
var userID string
const attempts = 3
for i := 0; i < attempts; i++ {
userID, err = s.validateToken(ctx, peerKey.String(), jwt)
if err == nil {
break
}
if i == attempts-1 {
break
}
log.WithContext(ctx).Warnf("failed validating JWT token while extending session for peer %s: %v. Retrying (idP cache).", peerKey.String(), err)
select {
case <-time.After(200 * time.Millisecond):
case <-ctx.Done():
return nil, ctx.Err()
}
}
if err != nil {
return nil, err
}
if userID == "" {
return nil, status.Errorf(codes.Unauthenticated, "jwt token did not yield a user id")
}
deadline, err := s.accountManager.ExtendPeerSession(ctx, peerKey.String(), userID)
if err != nil {
log.WithContext(ctx).Warnf("failed extending session for peer %s: %v", peerKey.String(), err)
return nil, mapError(ctx, err)
}
// Success path normally returns a non-zero deadline. A defensive zero
// would still encode as the explicit "disabled" sentinel rather than nil,
// so the client clears any stale anchor instead of preserving it.
resp := &proto.ExtendAuthSessionResponse{
SessionExpiresAt: encodeSessionExpiresAt(deadline),
}
wgKey, err := s.secretsManager.GetWGKey()
if err != nil {
return nil, status.Errorf(codes.Internal, "failed processing request")
}
encrypted, err := encryption.EncryptMessage(peerKey, wgKey, resp)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed encrypting response")
}
return &proto.EncryptedMessage{
WgPubKey: wgKey.PublicKey().String(),
Body: encrypted,
}, nil
}
func (s *Server) prepareLoginResponse(ctx context.Context, peer *nbpeer.Peer, netMap *types.NetworkMap, postureChecks []*posture.Checks) (*proto.LoginResponse, error) {
var relayToken *Token
var err error
@@ -844,6 +918,12 @@ func (s *Server) prepareLoginResponse(ctx context.Context, peer *nbpeer.Peer, ne
Checks: toProtocolChecks(ctx, postureChecks),
}
// settings is always non-nil here, so we never emit nil — encoder returns
// either a valid deadline or the explicit-zero "disabled" sentinel.
loginResp.SessionExpiresAt = encodeSessionExpiresAt(
peer.SessionExpiresAt(settings.PeerLoginExpirationEnabled, settings.PeerLoginExpiration),
)
return loginResp, nil
}

View File

@@ -355,7 +355,17 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco
oldSettings.LazyConnectionEnabled != newSettings.LazyConnectionEnabled ||
oldSettings.DNSDomain != newSettings.DNSDomain ||
oldSettings.AutoUpdateVersion != newSettings.AutoUpdateVersion ||
oldSettings.AutoUpdateAlways != newSettings.AutoUpdateAlways {
oldSettings.AutoUpdateAlways != newSettings.AutoUpdateAlways ||
oldSettings.PeerLoginExpirationEnabled != newSettings.PeerLoginExpirationEnabled ||
oldSettings.PeerLoginExpiration != newSettings.PeerLoginExpiration {
// Session deadline is derived from LastLogin + PeerLoginExpiration
// on every Login/Sync response. Without a fan-out push, connected
// peers keep the deadline they received at login time and only see
// the new value after the next unrelated NetworkMap change. Add
// these two fields to the trigger list so admin-side expiry tweaks
// (e.g. shortening from 24h to 1h) reach every connected peer
// within seconds, which is what the proactive-warning feature
// relies on (see client/internal/auth/sessionwatch).
updateAccountPeers = true
}

View File

@@ -109,6 +109,7 @@ type Manager interface {
UpdateAccountSettings(ctx context.Context, accountID, userID string, newSettings *types.Settings) (*types.Settings, error)
UpdateAccountOnboarding(ctx context.Context, accountID, userID string, newOnboarding *types.AccountOnboarding) (*types.AccountOnboarding, error)
LoginPeer(ctx context.Context, login types.PeerLogin) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) // used by peer gRPC API
ExtendPeerSession(ctx context.Context, peerPubKey, userID string) (time.Time, error) // used by peer gRPC API for ExtendAuthSession
SyncPeer(ctx context.Context, sync types.PeerSync, accountID string) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) // used by peer gRPC API
GetExternalCacheManager() ExternalCacheManager
GetPostureChecks(ctx context.Context, accountID, postureChecksID, userID string) (*posture.Checks, error)

View File

@@ -1304,6 +1304,21 @@ func (mr *MockManagerMockRecorder) LoginPeer(ctx, login interface{}) *gomock.Cal
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoginPeer", reflect.TypeOf((*MockManager)(nil).LoginPeer), ctx, login)
}
// ExtendPeerSession mocks base method.
func (m *MockManager) ExtendPeerSession(ctx context.Context, peerPubKey, userID string) (time.Time, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ExtendPeerSession", ctx, peerPubKey, userID)
ret0, _ := ret[0].(time.Time)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ExtendPeerSession indicates an expected call of ExtendPeerSession.
func (mr *MockManagerMockRecorder) ExtendPeerSession(ctx, peerPubKey, userID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExtendPeerSession", reflect.TypeOf((*MockManager)(nil).ExtendPeerSession), ctx, peerPubKey, userID)
}
// MarkPeerConnected mocks base method.
func (m *MockManager) MarkPeerConnected(ctx context.Context, peerKey string, realIP net.IP, accountID string, sessionStartedAt int64) error {
m.ctrl.T.Helper()

View File

@@ -240,6 +240,10 @@ const (
AccountLocalMfaEnabled Activity = 123
// AccountLocalMfaDisabled indicates that a user disabled TOTP MFA for local users
AccountLocalMfaDisabled Activity = 124
// UserExtendedPeerSession indicates that a user refreshed their peer's
// SSO session deadline via ExtendAuthSession without re-establishing the
// tunnel. Distinct from UserLoggedInPeer (full interactive login).
UserExtendedPeerSession Activity = 125
AccountDeleted Activity = 99999
)
@@ -394,6 +398,8 @@ var activityMap = map[Activity]Code{
AccountLocalMfaEnabled: {"Account local MFA enabled", "account.setting.local.mfa.enable"},
AccountLocalMfaDisabled: {"Account local MFA disabled", "account.setting.local.mfa.disable"},
UserExtendedPeerSession: {"User extended peer session", "user.peer.session.extend"},
DomainAdded: {"Domain added", "domain.add"},
DomainDeleted: {"Domain deleted", "domain.delete"},
DomainValidated: {"Domain validated", "domain.validate"},

View File

@@ -98,6 +98,7 @@ type MockAccountManager struct {
GetPeerFunc func(ctx context.Context, accountID, peerID, userID string) (*nbpeer.Peer, error)
UpdateAccountSettingsFunc func(ctx context.Context, accountID, userID string, newSettings *types.Settings) (*types.Settings, error)
LoginPeerFunc func(ctx context.Context, login types.PeerLogin) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error)
ExtendPeerSessionFunc func(ctx context.Context, peerPubKey, userID string) (time.Time, error)
SyncPeerFunc func(ctx context.Context, sync types.PeerSync, accountID string) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error)
InviteUserFunc func(ctx context.Context, accountID string, initiatorUserID string, targetUserEmail string) error
ApproveUserFunc func(ctx context.Context, accountID, initiatorUserID, targetUserID string) (*types.UserInfo, error)
@@ -860,6 +861,14 @@ func (am *MockAccountManager) LoginPeer(ctx context.Context, login types.PeerLog
return nil, nil, nil, status.Errorf(codes.Unimplemented, "method LoginPeer is not implemented")
}
// ExtendPeerSession mocks ExtendPeerSession of the AccountManager interface
func (am *MockAccountManager) ExtendPeerSession(ctx context.Context, peerPubKey, userID string) (time.Time, error) {
if am.ExtendPeerSessionFunc != nil {
return am.ExtendPeerSessionFunc(ctx, peerPubKey, userID)
}
return time.Time{}, status.Errorf(codes.Unimplemented, "method ExtendPeerSession is not implemented")
}
// SyncPeer mocks SyncPeer of the AccountManager interface
func (am *MockAccountManager) SyncPeer(ctx context.Context, sync types.PeerSync, accountID string) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) {
if am.SyncPeerFunc != nil {

View File

@@ -1128,6 +1128,79 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
return p, nmap, pc, err
}
// ExtendPeerSession refreshes the peer's SSO session deadline by updating
// LastLogin after a successful JWT validation. The tunnel is untouched: no
// network map sync, no peer reconnect.
//
// Preconditions enforced here:
// - userID must be present (caller validated the JWT and extracted the user ID).
// - The peer must exist and be SSO-registered (AddedWithSSOLogin) with
// LoginExpirationEnabled.
// - Account-level PeerLoginExpirationEnabled must be true.
// - The JWT user must match peer.UserID (mirrors LoginPeer at peer.go ~1028).
//
// Returns the new absolute UTC deadline.
func (am *DefaultAccountManager) ExtendPeerSession(ctx context.Context, peerPubKey, userID string) (time.Time, error) {
if userID == "" {
return time.Time{}, status.Errorf(status.PermissionDenied, "session extend requires a JWT")
}
accountID, err := am.Store.GetAccountIDByPeerPubKey(ctx, peerPubKey)
if err != nil {
return time.Time{}, err
}
settings, err := am.Store.GetAccountSettings(ctx, store.LockingStrengthNone, accountID)
if err != nil {
return time.Time{}, err
}
if !settings.PeerLoginExpirationEnabled {
return time.Time{}, status.Errorf(status.PreconditionFailed, "peer login expiration is disabled for the account")
}
var refreshed *nbpeer.Peer
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
peer, err := transaction.GetPeerByPeerPubKey(ctx, store.LockingStrengthUpdate, peerPubKey)
if err != nil {
return err
}
if !peer.AddedWithSSOLogin() || !peer.LoginExpirationEnabled {
return status.Errorf(status.PreconditionFailed, "peer is not eligible for session extension")
}
if peer.UserID != userID {
log.WithContext(ctx).Warnf("user mismatch when extending session for peer %s: peer user %s, jwt user %s", peer.ID, peer.UserID, userID)
return status.NewPeerLoginMismatchError()
}
peer = peer.UpdateLastLogin()
if err := transaction.SavePeer(ctx, accountID, peer); err != nil {
return err
}
if err := transaction.SaveUserLastLogin(ctx, accountID, userID, peer.GetLastLogin()); err != nil {
log.WithContext(ctx).Debugf("failed to update user last login during session extend: %v", err)
}
am.StoreEvent(ctx, userID, peer.ID, accountID, activity.UserExtendedPeerSession, peer.EventMeta(am.networkMapController.GetDNSDomain(settings)))
refreshed = peer
return nil
})
if err != nil {
return time.Time{}, err
}
// Reschedule the per-account expiration job. schedulePeerLoginExpiration
// is a no-op when a job is already running, but the running job will pick
// up the new LastLogin on its next tick. Calling it here is harmless and
// guarantees a job is in flight even if a prior one ended right before
// the extend.
am.schedulePeerLoginExpiration(ctx, accountID)
return refreshed.SessionExpiresAt(settings.PeerLoginExpirationEnabled, settings.PeerLoginExpiration), nil
}
// getPeerPostureChecks returns the posture checks for the peer.
func getPeerPostureChecks(ctx context.Context, transaction store.Store, accountID, peerID string) ([]*posture.Checks, error) {
policies, err := transaction.GetAccountPolicies(ctx, store.LockingStrengthNone, accountID)

View File

@@ -367,6 +367,22 @@ func (p *Peer) LoginExpired(expiresIn time.Duration) (bool, time.Duration) {
return timeLeft <= 0, timeLeft
}
// SessionExpiresAt returns the absolute UTC instant at which the peer's SSO
// session expires, derived from LastLogin and the account-level
// PeerLoginExpiration setting. Returns the zero value when login expiration
// does not apply (peer not SSO-registered, peer-level toggle off, or account
// expiry disabled). Callers should treat the zero value as "no deadline".
func (p *Peer) SessionExpiresAt(accountExpirationEnabled bool, expiresIn time.Duration) time.Time {
if !accountExpirationEnabled || !p.AddedWithSSOLogin() || !p.LoginExpirationEnabled {
return time.Time{}
}
last := p.GetLastLogin()
if last.IsZero() {
return time.Time{}
}
return last.Add(expiresIn).UTC()
}
// FQDN returns peers FQDN combined of the peer's DNS label and the system's DNS domain
func (p *Peer) FQDN(dnsDomain string) string {
if dnsDomain == "" {

View File

@@ -3463,49 +3463,6 @@ func (s *SqlStore) GetAllEphemeralPeers(ctx context.Context, lockStrength Lockin
return allEphemeralPeers, nil
}
// GetStaleEphemeralPeerIDsForAccount returns IDs of disconnected
// ephemeral peers in the given account whose last_seen is strictly
// older than olderThan.
func (s *SqlStore) GetStaleEphemeralPeerIDsForAccount(ctx context.Context, accountID string, olderThan time.Time) ([]string, error) {
var ids []string
err := s.db.WithContext(ctx).
Model(&nbpeer.Peer{}).
Where("account_id = ? AND ephemeral = ? AND peer_status_connected = ? AND peer_status_last_seen < ?",
accountID, true, false, olderThan).
Pluck("id", &ids).Error
if err != nil {
log.WithContext(ctx).Errorf("failed to query stale ephemeral peers for account %s: %v", accountID, err)
return nil, status.Errorf(status.Internal, "query stale ephemeral peers")
}
return ids, nil
}
// GetEphemeralAccountsLastDisconnect returns the latest peer_status_last_seen
// per account across disconnected ephemeral peers. Returns one entry per
// account that has at least one such peer.
func (s *SqlStore) GetEphemeralAccountsLastDisconnect(ctx context.Context) (map[string]time.Time, error) {
type row struct {
AccountID string
LastSeen time.Time
}
var rows []row
err := s.db.WithContext(ctx).
Model(&nbpeer.Peer{}).
Select("account_id, MAX(peer_status_last_seen) AS last_seen").
Where("ephemeral = ? AND peer_status_connected = ?", true, false).
Group("account_id").
Scan(&rows).Error
if err != nil {
log.WithContext(ctx).Errorf("failed to load ephemeral-account last disconnect map: %v", err)
return nil, status.Errorf(status.Internal, "load ephemeral accounts")
}
out := make(map[string]time.Time, len(rows))
for _, r := range rows {
out[r.AccountID] = r.LastSeen
}
return out, nil
}
// DeletePeer removes a peer from the store.
func (s *SqlStore) DeletePeer(ctx context.Context, accountID string, peerID string) error {
result := s.db.Delete(&nbpeer.Peer{}, accountAndIDQueryCondition, accountID, peerID)

View File

@@ -165,15 +165,6 @@ type Store interface {
GetAccountPeersWithExpiration(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*nbpeer.Peer, error)
GetAccountPeersWithInactivity(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*nbpeer.Peer, error)
GetAllEphemeralPeers(ctx context.Context, lockStrength LockingStrength) ([]*nbpeer.Peer, error)
// GetStaleEphemeralPeerIDsForAccount returns the IDs of disconnected
// ephemeral peers whose last_seen is strictly older than olderThan,
// scoped to a single account. Used by the per-account cleanup sweep.
GetStaleEphemeralPeerIDsForAccount(ctx context.Context, accountID string, olderThan time.Time) ([]string, error)
// GetEphemeralAccountsLastDisconnect returns, for every account that
// has at least one disconnected ephemeral peer, the most recent
// last_seen across that account's disconnected ephemeral peers. Used
// to reconstruct the per-account cleanup tracker after a restart.
GetEphemeralAccountsLastDisconnect(ctx context.Context) (map[string]time.Time, error)
SavePeer(ctx context.Context, accountID string, peer *nbpeer.Peer) error
SavePeerStatus(ctx context.Context, accountID, peerID string, status nbpeer.PeerStatus) error
// MarkPeerConnectedIfNewerSession sets the peer to connected with the

View File

@@ -1376,36 +1376,6 @@ func (mr *MockStoreMockRecorder) GetAllEphemeralPeers(ctx, lockStrength interfac
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllEphemeralPeers", reflect.TypeOf((*MockStore)(nil).GetAllEphemeralPeers), ctx, lockStrength)
}
// GetStaleEphemeralPeerIDsForAccount mocks base method.
func (m *MockStore) GetStaleEphemeralPeerIDsForAccount(ctx context.Context, accountID string, olderThan time.Time) ([]string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetStaleEphemeralPeerIDsForAccount", ctx, accountID, olderThan)
ret0, _ := ret[0].([]string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetStaleEphemeralPeerIDsForAccount indicates an expected call of GetStaleEphemeralPeerIDsForAccount.
func (mr *MockStoreMockRecorder) GetStaleEphemeralPeerIDsForAccount(ctx, accountID, olderThan interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStaleEphemeralPeerIDsForAccount", reflect.TypeOf((*MockStore)(nil).GetStaleEphemeralPeerIDsForAccount), ctx, accountID, olderThan)
}
// GetEphemeralAccountsLastDisconnect mocks base method.
func (m *MockStore) GetEphemeralAccountsLastDisconnect(ctx context.Context) (map[string]time.Time, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetEphemeralAccountsLastDisconnect", ctx)
ret0, _ := ret[0].(map[string]time.Time)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetEphemeralAccountsLastDisconnect indicates an expected call of GetEphemeralAccountsLastDisconnect.
func (mr *MockStoreMockRecorder) GetEphemeralAccountsLastDisconnect(ctx interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEphemeralAccountsLastDisconnect", reflect.TypeOf((*MockStore)(nil).GetEphemeralAccountsLastDisconnect), ctx)
}
// GetAllProxyAccessTokens mocks base method.
func (m *MockStore) GetAllProxyAccessTokens(ctx context.Context, lockStrength LockingStrength) ([]*types2.ProxyAccessToken, error) {
m.ctrl.T.Helper()

View File

@@ -7,9 +7,9 @@ import (
)
// EphemeralPeersMetrics tracks the ephemeral peer cleanup pipeline: how
// many accounts are currently being tracked for cleanup, how many sweep
// runs deleted at least one peer, how many peers have been removed, and
// how many delete batches failed.
// many peers are currently scheduled for deletion, how many tick runs
// the cleaner has performed, how many peers it has removed, and how
// many delete batches failed.
type EphemeralPeersMetrics struct {
ctx context.Context
@@ -21,16 +21,16 @@ type EphemeralPeersMetrics struct {
// NewEphemeralPeersMetrics constructs the ephemeral cleanup counters.
func NewEphemeralPeersMetrics(ctx context.Context, meter metric.Meter) (*EphemeralPeersMetrics, error) {
pending, err := meter.Int64UpDownCounter("management.ephemeral.accounts.tracked",
pending, err := meter.Int64UpDownCounter("management.ephemeral.peers.pending",
metric.WithUnit("1"),
metric.WithDescription("Number of accounts currently tracked for ephemeral peer cleanup"))
metric.WithDescription("Number of ephemeral peers currently waiting to be cleaned up"))
if err != nil {
return nil, err
}
cleanupRuns, err := meter.Int64Counter("management.ephemeral.cleanup.runs.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of ephemeral cleanup sweeps that deleted at least one peer"))
metric.WithDescription("Number of ephemeral cleanup ticks that processed at least one peer"))
if err != nil {
return nil, err
}
@@ -61,8 +61,7 @@ func NewEphemeralPeersMetrics(ctx context.Context, meter metric.Meter) (*Ephemer
// All methods are nil-receiver safe so callers that haven't wired metrics
// (tests, self-hosted with metrics off) can invoke them unconditionally.
// IncPending bumps the tracked-accounts gauge when a new account
// becomes eligible for ephemeral cleanup tracking.
// IncPending bumps the pending gauge when a peer is added to the cleanup list.
func (m *EphemeralPeersMetrics) IncPending() {
if m == nil {
return
@@ -70,8 +69,8 @@ func (m *EphemeralPeersMetrics) IncPending() {
m.pending.Add(m.ctx, 1)
}
// AddPending bumps the tracked-accounts gauge by n — used at startup
// when the catch-up query seeds the tracker.
// AddPending bumps the pending gauge by n — used at startup when the
// initial set of ephemeral peers is loaded from the store.
func (m *EphemeralPeersMetrics) AddPending(n int64) {
if m == nil || n <= 0 {
return
@@ -79,8 +78,9 @@ func (m *EphemeralPeersMetrics) AddPending(n int64) {
m.pending.Add(m.ctx, n)
}
// DecPending decreases the tracked-accounts gauge when an account is
// dropped from the tracker (no more disconnects to chase).
// DecPending decreases the pending gauge — used both when a peer reconnects
// before its deadline (removed from the list) and when a cleanup tick
// actually deletes it.
func (m *EphemeralPeersMetrics) DecPending(n int64) {
if m == nil || n <= 0 {
return

View File

@@ -16,6 +16,10 @@ type Client interface {
Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
Register(setupKey string, jwtToken string, sysInfo *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
Login(sysInfo *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
// ExtendAuthSession refreshes the peer's SSO session deadline using a fresh JWT.
// Returns the new absolute deadline; zero time when the server reports the peer
// is not eligible for session extension.
ExtendAuthSession(sysInfo *system.Info, jwtToken string) (*proto.ExtendAuthSessionResponse, error)
GetDeviceAuthorizationFlow() (*proto.DeviceAuthorizationFlow, error)
GetPKCEAuthorizationFlow() (*proto.PKCEAuthorizationFlow, error)
GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, error)

View File

@@ -607,6 +607,61 @@ func (c *GrpcClient) Login(sysInfo *system.Info, pubSSHKey []byte, dnsLabels dom
return c.login(&proto.LoginRequest{Meta: infoToMetaData(sysInfo), PeerKeys: keys, DnsLabels: dnsLabels.ToPunycodeList()})
}
// ExtendAuthSession refreshes the peer's SSO session deadline on the management
// server using a freshly issued JWT. The tunnel is untouched: no network map
// sync, no peer reconnect. Returns the new absolute UTC deadline (zero time
// when the server reports the field empty).
func (c *GrpcClient) ExtendAuthSession(sysInfo *system.Info, jwtToken string) (*proto.ExtendAuthSessionResponse, error) {
if !c.ready() {
return nil, errors.New(errMsgNoMgmtConnection)
}
serverKey, err := c.getServerPublicKey()
if err != nil {
return nil, err
}
reqBody, err := encryption.EncryptMessage(*serverKey, c.key, &proto.ExtendAuthSessionRequest{
JwtToken: jwtToken,
Meta: infoToMetaData(sysInfo),
})
if err != nil {
log.Errorf("failed to encrypt extend auth session message: %s", err)
return nil, err
}
var resp *proto.EncryptedMessage
operation := func() error {
mgmCtx, cancel := context.WithTimeout(context.Background(), ConnectTimeout)
defer cancel()
var err error
resp, err = c.realClient.ExtendAuthSession(mgmCtx, &proto.EncryptedMessage{
WgPubKey: c.key.PublicKey().String(),
Body: reqBody,
})
if err != nil {
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.Canceled {
return err
}
return backoff.Permanent(err)
}
return nil
}
if err := backoff.Retry(operation, nbgrpc.Backoff(c.ctx)); err != nil {
log.Errorf("failed to extend auth session on Management Service: %v", err)
return nil, err
}
out := &proto.ExtendAuthSessionResponse{}
if err := encryption.DecryptMessage(*serverKey, c.key, resp.Body, out); err != nil {
log.Errorf("failed to decrypt extend auth session response: %s", err)
return nil, err
}
return out, nil
}
// GetDeviceAuthorizationFlow returns a device authorization flow information.
// It also takes care of encrypting and decrypting messages.
func (c *GrpcClient) GetDeviceAuthorizationFlow() (*proto.DeviceAuthorizationFlow, error) {

View File

@@ -14,6 +14,7 @@ type MockClient struct {
SyncFunc func(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error
RegisterFunc func(setupKey string, jwtToken string, info *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
LoginFunc func(info *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
ExtendAuthSessionFunc func(info *system.Info, jwtToken string) (*proto.ExtendAuthSessionResponse, error)
GetDeviceAuthorizationFlowFunc func() (*proto.DeviceAuthorizationFlow, error)
GetPKCEAuthorizationFlowFunc func() (*proto.PKCEAuthorizationFlow, error)
GetServerURLFunc func() string
@@ -65,6 +66,13 @@ func (m *MockClient) Login(info *system.Info, sshKey []byte, dnsLabels domain.Li
return m.LoginFunc(info, sshKey, dnsLabels)
}
func (m *MockClient) ExtendAuthSession(info *system.Info, jwtToken string) (*proto.ExtendAuthSessionResponse, error) {
if m.ExtendAuthSessionFunc == nil {
return nil, nil
}
return m.ExtendAuthSessionFunc(info, jwtToken)
}
func (m *MockClient) GetDeviceAuthorizationFlow() (*proto.DeviceAuthorizationFlow, error) {
if m.GetDeviceAuthorizationFlowFunc == nil {
return nil, nil

File diff suppressed because it is too large Load Diff

View File

@@ -52,6 +52,14 @@ service ManagementService {
// Executes a job on a target peer (e.g., debug bundle)
rpc Job(stream EncryptedMessage) returns (stream EncryptedMessage) {}
// ExtendAuthSession refreshes the peer's session expiry deadline using a fresh JWT.
// Same JWT validation pipeline as Login (including jwt.UserID == peer.UserID check),
// but does not redo the network-map sync. Only valid for SSO-registered peers where
// login expiration is enabled. The tunnel remains up.
// EncryptedMessage of the request has a body of ExtendAuthSessionRequest.
// EncryptedMessage of the response has a body of ExtendAuthSessionResponse.
rpc ExtendAuthSession(EncryptedMessage) returns (EncryptedMessage) {}
// CreateExpose creates a temporary reverse proxy service for a peer
rpc CreateExpose(EncryptedMessage) returns (EncryptedMessage) {}
@@ -133,6 +141,15 @@ message SyncResponse {
// Posture checks to be evaluated by client
repeated Checks Checks = 6;
// 3-state session deadline. Carried on every Sync snapshot so admin-side
// changes propagate live without a client reconnect.
// field unset (nil) → snapshot carries no info; client keeps the
// deadline it already had
// set, seconds=0 nanos=0 → explicit "expiry disabled" or peer is not
// SSO-registered; client clears its anchor
// set, valid timestamp → new absolute UTC deadline
google.protobuf.Timestamp sessionExpiresAt = 7;
}
message SyncMetaRequest {
@@ -244,6 +261,31 @@ message LoginResponse {
PeerConfig peerConfig = 2;
// Posture checks to be evaluated by client
repeated Checks Checks = 3;
// 3-state session deadline; same encoding as SyncResponse.sessionExpiresAt.
// field unset (nil) → no info; client keeps any deadline it had
// set, seconds=0 nanos=0 → explicit "expiry disabled" / non-SSO peer
// set, valid timestamp → new absolute UTC deadline
google.protobuf.Timestamp sessionExpiresAt = 4;
}
// ExtendAuthSessionRequest carries a fresh JWT to refresh the peer's session deadline.
// The encrypted body of an EncryptedMessage with this payload is sent to the
// ExtendAuthSession RPC.
message ExtendAuthSessionRequest {
// SSO token (must be a fresh, valid JWT for the peer's owning user)
string jwtToken = 1;
// Meta data of the peer (used for IdP user info refresh consistent with Login)
PeerSystemMeta meta = 2;
}
// ExtendAuthSessionResponse contains the refreshed session deadline.
message ExtendAuthSessionResponse {
// 3-state session deadline; same encoding as SyncResponse.sessionExpiresAt.
// In practice ExtendAuthSession only succeeds for SSO peers with expiry
// enabled, so this carries a valid timestamp on the success path. The
// 3-state encoding is documented here for symmetry with Login/Sync.
google.protobuf.Timestamp sessionExpiresAt = 1;
}
message ServerKeyResponse {

View File

@@ -52,6 +52,13 @@ type ManagementServiceClient interface {
Logout(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*Empty, error)
// Executes a job on a target peer (e.g., debug bundle)
Job(ctx context.Context, opts ...grpc.CallOption) (ManagementService_JobClient, error)
// ExtendAuthSession refreshes the peer's session expiry deadline using a fresh JWT.
// Same JWT validation pipeline as Login (including jwt.UserID == peer.UserID check),
// but does not redo the network-map sync. Only valid for SSO-registered peers where
// login expiration is enabled. The tunnel remains up.
// EncryptedMessage of the request has a body of ExtendAuthSessionRequest.
// EncryptedMessage of the response has a body of ExtendAuthSessionResponse.
ExtendAuthSession(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*EncryptedMessage, error)
// CreateExpose creates a temporary reverse proxy service for a peer
CreateExpose(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*EncryptedMessage, error)
// RenewExpose extends the TTL of an active expose session
@@ -194,6 +201,15 @@ func (x *managementServiceJobClient) Recv() (*EncryptedMessage, error) {
return m, nil
}
func (c *managementServiceClient) ExtendAuthSession(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*EncryptedMessage, error) {
out := new(EncryptedMessage)
err := c.cc.Invoke(ctx, "/management.ManagementService/ExtendAuthSession", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *managementServiceClient) CreateExpose(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*EncryptedMessage, error) {
out := new(EncryptedMessage)
err := c.cc.Invoke(ctx, "/management.ManagementService/CreateExpose", in, out, opts...)
@@ -259,6 +275,13 @@ type ManagementServiceServer interface {
Logout(context.Context, *EncryptedMessage) (*Empty, error)
// Executes a job on a target peer (e.g., debug bundle)
Job(ManagementService_JobServer) error
// ExtendAuthSession refreshes the peer's session expiry deadline using a fresh JWT.
// Same JWT validation pipeline as Login (including jwt.UserID == peer.UserID check),
// but does not redo the network-map sync. Only valid for SSO-registered peers where
// login expiration is enabled. The tunnel remains up.
// EncryptedMessage of the request has a body of ExtendAuthSessionRequest.
// EncryptedMessage of the response has a body of ExtendAuthSessionResponse.
ExtendAuthSession(context.Context, *EncryptedMessage) (*EncryptedMessage, error)
// CreateExpose creates a temporary reverse proxy service for a peer
CreateExpose(context.Context, *EncryptedMessage) (*EncryptedMessage, error)
// RenewExpose extends the TTL of an active expose session
@@ -299,6 +322,9 @@ func (UnimplementedManagementServiceServer) Logout(context.Context, *EncryptedMe
func (UnimplementedManagementServiceServer) Job(ManagementService_JobServer) error {
return status.Errorf(codes.Unimplemented, "method Job not implemented")
}
func (UnimplementedManagementServiceServer) ExtendAuthSession(context.Context, *EncryptedMessage) (*EncryptedMessage, error) {
return nil, status.Errorf(codes.Unimplemented, "method ExtendAuthSession not implemented")
}
func (UnimplementedManagementServiceServer) CreateExpose(context.Context, *EncryptedMessage) (*EncryptedMessage, error) {
return nil, status.Errorf(codes.Unimplemented, "method CreateExpose not implemented")
}
@@ -494,6 +520,24 @@ func (x *managementServiceJobServer) Recv() (*EncryptedMessage, error) {
return m, nil
}
func _ManagementService_ExtendAuthSession_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(EncryptedMessage)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ManagementServiceServer).ExtendAuthSession(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/management.ManagementService/ExtendAuthSession",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ManagementServiceServer).ExtendAuthSession(ctx, req.(*EncryptedMessage))
}
return interceptor(ctx, in, info, handler)
}
func _ManagementService_CreateExpose_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(EncryptedMessage)
if err := dec(in); err != nil {
@@ -583,6 +627,10 @@ var ManagementService_ServiceDesc = grpc.ServiceDesc{
MethodName: "Logout",
Handler: _ManagementService_Logout_Handler,
},
{
MethodName: "ExtendAuthSession",
Handler: _ManagementService_ExtendAuthSession_Handler,
},
{
MethodName: "CreateExpose",
Handler: _ManagementService_CreateExpose_Handler,