Compare commits

..

10 Commits

Author SHA1 Message Date
Zoltán Papp
48b1ab010e make ExtendAuthSession JWT-retry backoff cancellable
Skip the retry log and 200ms wait on the final attempt, and replace the
uncancellable time.Sleep with a select on time.After/ctx.Done so an
upstream cancellation aborts the wait instead of running it to
completion.
2026-05-19 17:35:56 +02:00
Zoltán Papp
cfeb15fe2a add UserExtendedPeerSession activity event
ExtendAuthSession previously reused UserLoggedInPeer for its audit
record, which conflated two distinct user actions: a full interactive
SSO login (tunnel re-established, network map resync) versus an
in-place deadline refresh (tunnel untouched). Auditors reading the log
couldn't tell which one happened, and downstream dashboards/alerts on
"login" volume were polluted by routine extends.

Adds a dedicated UserExtendedPeerSession Activity (code 125,
"user.peer.session.extend") and switches ExtendPeerSession over to it.
The peer-extend audit trail is now distinguishable from interactive
logins.
2026-05-19 14:55:22 +02:00
Zoltán Papp
7de3242a27 encode SessionExpiresAt as 3-state on the wire
Previously the `sessionExpiresAt` field on LoginResponse, SyncResponse
and ExtendAuthSessionResponse was 2-state: a valid timestamp meant
"new deadline", and nil meant "clear". That conflated two distinct
meanings — "no info in this snapshot" vs "expiry is explicitly off /
peer is not SSO-tracked" — so a Sync push that legitimately couldn't
compute the deadline (settings lookup failed) would silently clear the
client's anchor and lose the warning window.

Three states now, encoded on the same field number (no .proto schema
churn — only comments and the server-side encoder change):

  - nil pointer (field absent) → "no info"; client preserves anchor
  - &Timestamp{} (seconds=0, nanos=0) → explicit "disabled / not SSO"
    sentinel; client clears
  - valid timestamp → new absolute UTC deadline

A new encodeSessionExpiresAt helper centralises the zero/non-zero
encoding and is shared by the Sync, Login and ExtendAuthSession
builders. The Sync builder still emits nil when settings are missing.
Login and ExtendAuthSession always carry an authoritative value.

The matching client-side decoder lands on feature/session-extend.
2026-05-19 14:46:27 +02:00
Zoltán Papp
6dcba89a46 add SSO session extend flow (management)
Adds the management-server half of the SSO session-extension feature:

- New ExtendAuthSession gRPC RPC that refreshes a peer's session expiry
  using a fresh JWT, validated through the same pipeline as Login but
  without tearing down the tunnel or redoing the NetworkMap sync.
- Per-peer SessionExpiresAt timestamp on every LoginResponse and
  SyncResponse so connected clients learn the deadline on the existing
  long-lived stream, and admin-side changes (toggling expiration,
  changing the expiration window) reach every peer within seconds.
- SessionExpiresAt(...) helper on Peer that derives the absolute UTC
  deadline from LastLogin + the account-level PeerLoginExpiration
  setting, returning zero when the peer is not SSO-tracked or expiration
  is disabled.

The matching client-side consumer of these fields lands separately.
2026-05-18 23:37:02 +02:00
Maycon Santos
af24fd7796 [management] Add metrics for peer status updates and ephemeral cleanup (#6196)
* [management] Add metrics for peer status updates and ephemeral cleanup

The session-fenced MarkPeerConnected / MarkPeerDisconnected path and
the ephemeral peer cleanup loop both run silently today: when fencing
rejects a stale stream, when a cleanup tick deletes peers, or when a
batch delete fails, we have no operational signal beyond log lines.

Add OpenTelemetry counters and a histogram so the same SLO-style
dashboards that already exist for the network-map controller can cover
peer connect/disconnect and ephemeral cleanup too.

All new attributes are bounded enums: operation in {connect,disconnect}
and outcome in {applied,stale,error,peer_not_found}. No account, peer,
or user ID is ever written as a metric label — total cardinality is
fixed at compile time (8 counter series, 2 histogram series, 4 unlabeled
ephemeral series).

Metric methods are nil-receiver safe so test composition that doesn't
wire telemetry (the bulk of the existing tests) works unchanged. The
ephemeral manager exposes a SetMetrics setter rather than taking the
collector through its constructor, keeping the constructor signature
stable across all test call sites.

* [management] Add OpenTelemetry metrics for ephemeral peer cleanup

Introduce counters for tracking ephemeral peer cleanup, including peers pending deletion, cleanup runs, successful deletions, and failed batches. Metrics are nil-receiver safe to ensure compatibility with test setups without telemetry.
2026-05-18 22:55:19 +02:00
Maycon Santos
13d32d274f [management] Fence peer status updates with a session token (#6193)
* [management] Fence peer status updates with a session token

The connect/disconnect path used a best-effort LastSeen-after-streamStart
comparison to decide whether a status update should land. Under contention
— a re-sync arriving while the previous stream's disconnect was still in
flight, or two management replicas seeing the same peer at once — the
check was a read-then-decide-then-write window: any UPDATE in between
caused the wrong row to be written. The Go-side time.Now() that fed the
comparison also drifted under lock contention, since it was captured
seconds before the write actually committed.

Replace it with an integer-nanosecond fencing token stored alongside the
status. Every gRPC sync stream uses its open time (UnixNano) as its token.
Connects only land when the incoming token is strictly greater than the
stored one; disconnects only land when the incoming token equals the
stored one (i.e. we're the stream that owns the current session). Both
are single optimistic-locked UPDATEs — no read-then-write, no transaction
wrapper.

LastSeen is now written by the database itself (CURRENT_TIMESTAMP). The
caller never supplies it, so the value always reflects the real moment
of the UPDATE rather than the moment the caller queued the work — which
was already off by minutes under heavy lock contention.

Side effects (geo lookup, peer-login-expiration scheduling, network-map
fan-out) are explicitly documented as running after the fence UPDATE
commits, never inside it. Geo also skips the update when realIP equals
the stored ConnectionIP, dropping a redundant SavePeerLocation call on
same-IP reconnects.

Tests cover the three semantic cases (matched disconnect lands, stale
disconnect dropped, stale connect dropped) plus a 16-goroutine race test
that asserts the highest token always wins.

* [management] Add SessionStartedAt to peer status updates

Stored `SessionStartedAt` for fencing token propagation across goroutines and updated database queries/functions to handle the new field. Removed outdated geolocation handling logic and adjusted tests for concurrency safety.

* Rename `peer_status_required_approval` to `peer_status_requires_approval` in SQL store fields
2026-05-18 20:25:12 +02:00
Nicolas Frati
705f87fc20 [management] fix: device redirect uri wasn't registered (#6191)
* fix: device redirect uri wasn't registered

* fix lint
2026-05-18 12:57:59 +02:00
Viktor Liu
3f91f49277 Clean up legacy 32-bit and HKCU registry entries on Windows install (#6176) 2026-05-16 16:52:57 +02:00
Maycon Santos
347c5bf317 Avoid context cancellation in cancelPeerRoutines (#6175)
When closing go routines and handling peer disconnect, we should avoid canceling the flow due to parent gRPC context cancellation.

This change triggers disconnection handling with a context that is not bound to the parent gRPC cancellation.
2026-05-16 16:29:01 +02:00
Viktor Liu
22e2519d71 [management] Avoid peer IP reallocation when account settings update preserves the network range (#6173) 2026-05-16 15:51:48 +02:00
33 changed files with 2374 additions and 1095 deletions

View File

@@ -260,15 +260,23 @@ WriteRegStr ${REG_ROOT} "${UNINSTALL_PATH}" "Publisher" "${COMP_NAME}"
WriteRegStr ${REG_ROOT} "${UI_REG_APP_PATH}" "" "$INSTDIR\${UI_APP_EXE}"
; Create autostart registry entry based on checkbox
; Drop Run, App Paths and Uninstall entries left in the 32-bit registry view
; or HKCU by legacy installers.
DetailPrint "Cleaning legacy 32-bit / HKCU entries..."
DeleteRegValue HKCU "${AUTOSTART_REG_KEY}" "${APP_NAME}"
SetRegView 32
DeleteRegValue HKLM "${AUTOSTART_REG_KEY}" "${APP_NAME}"
DeleteRegKey HKLM "${REG_APP_PATH}"
DeleteRegKey HKLM "${UI_REG_APP_PATH}"
DeleteRegKey HKLM "${UNINSTALL_PATH}"
SetRegView 64
DetailPrint "Autostart enabled: $AutostartEnabled"
${If} $AutostartEnabled == "1"
WriteRegStr HKLM "${AUTOSTART_REG_KEY}" "${APP_NAME}" '"$INSTDIR\${UI_APP_EXE}.exe"'
DetailPrint "Added autostart registry entry: $INSTDIR\${UI_APP_EXE}.exe"
${Else}
DeleteRegValue HKLM "${AUTOSTART_REG_KEY}" "${APP_NAME}"
; Legacy: pre-HKLM installs wrote to HKCU; clean that up too.
DeleteRegValue HKCU "${AUTOSTART_REG_KEY}" "${APP_NAME}"
DetailPrint "Autostart not enabled by user"
${EndIf}
@@ -299,11 +307,16 @@ ExecWait '"$INSTDIR\${MAIN_APP_EXE}" service uninstall'
DetailPrint "Terminating Netbird UI process..."
ExecWait `taskkill /im ${UI_APP_EXE}.exe /f`
; Remove autostart registry entry
; Remove autostart entries from every view a previous installer may have used.
DetailPrint "Removing autostart registry entry if exists..."
DeleteRegValue HKLM "${AUTOSTART_REG_KEY}" "${APP_NAME}"
; Legacy: pre-HKLM installs wrote to HKCU; clean that up too.
DeleteRegValue HKCU "${AUTOSTART_REG_KEY}" "${APP_NAME}"
SetRegView 32
DeleteRegValue HKLM "${AUTOSTART_REG_KEY}" "${APP_NAME}"
DeleteRegKey HKLM "${REG_APP_PATH}"
DeleteRegKey HKLM "${UI_REG_APP_PATH}"
DeleteRegKey HKLM "${UNINSTALL_PATH}"
SetRegView 64
; Handle data deletion based on checkbox
DetailPrint "Checking if user requested data deletion..."

View File

@@ -64,6 +64,13 @@
<RegistryValue Name="InstalledByMSI" Type="integer" Value="1" KeyPath="yes" />
</RegistryKey>
</Component>
<!-- Drop the HKCU Run\Netbird value written by legacy NSIS installers. -->
<Component Id="NetbirdLegacyHKCUCleanup" Guid="*">
<RegistryValue Root="HKCU" Key="Software\NetBird GmbH\Installer"
Name="LegacyHKCUCleanup" Type="integer" Value="1" KeyPath="yes" />
<RemoveRegistryValue Root="HKCU"
Key="Software\Microsoft\Windows\CurrentVersion\Run" Name="Netbird" />
</Component>
</StandardDirectory>
<StandardDirectory Id="CommonAppDataFolder">
@@ -76,10 +83,28 @@
</Directory>
</StandardDirectory>
<!-- Drop Run, App Paths and Uninstall entries written by legacy NSIS
installers into the 32-bit registry view (HKLM\Software\Wow6432Node). -->
<Component Id="NetbirdLegacyWow6432Cleanup" Directory="NetbirdInstallDir"
Guid="bda5d628-16bd-4086-b2c1-5099d8d51763" Bitness="always32">
<RegistryValue Root="HKLM" Key="Software\NetBird GmbH\Installer"
Name="LegacyWow6432Cleanup" Type="integer" Value="1" KeyPath="yes" />
<RemoveRegistryValue Root="HKLM"
Key="Software\Microsoft\Windows\CurrentVersion\Run" Name="Netbird" />
<RemoveRegistryKey Action="removeOnInstall" Root="HKLM"
Key="Software\Microsoft\Windows\CurrentVersion\App Paths\Netbird" />
<RemoveRegistryKey Action="removeOnInstall" Root="HKLM"
Key="Software\Microsoft\Windows\CurrentVersion\App Paths\Netbird-ui" />
<RemoveRegistryKey Action="removeOnInstall" Root="HKLM"
Key="Software\Microsoft\Windows\CurrentVersion\Uninstall\Netbird" />
</Component>
<ComponentGroup Id="NetbirdFilesComponent">
<ComponentRef Id="NetbirdFiles" />
<ComponentRef Id="NetbirdAumidRegistry" />
<ComponentRef Id="NetbirdAutoStart" />
<ComponentRef Id="NetbirdLegacyHKCUCleanup" />
<ComponentRef Id="NetbirdLegacyWow6432Cleanup" />
</ComponentGroup>
<util:CloseApplication Id="CloseNetBird" CloseMessage="no" Target="netbird.exe" RebootPrompt="no" />

View File

@@ -11,6 +11,7 @@ import (
"github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral"
"github.com/netbirdio/netbird/management/server/activity"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/store"
)
@@ -47,6 +48,11 @@ type EphemeralManager struct {
lifeTime time.Duration
cleanupWindow time.Duration
// metrics is nil-safe; methods on telemetry.EphemeralPeersMetrics
// no-op when the receiver is nil so deployments without an app
// metrics provider work unchanged.
metrics *telemetry.EphemeralPeersMetrics
}
// NewEphemeralManager instantiate new EphemeralManager
@@ -60,6 +66,15 @@ func NewEphemeralManager(store store.Store, peersManager peers.Manager) *Ephemer
}
}
// SetMetrics attaches a metrics collector. Safe to call once before
// LoadInitialPeers; later attachment is fine but earlier loads won't be
// reflected in the gauge. Pass nil to detach.
func (e *EphemeralManager) SetMetrics(m *telemetry.EphemeralPeersMetrics) {
e.peersLock.Lock()
e.metrics = m
e.peersLock.Unlock()
}
// LoadInitialPeers load from the database the ephemeral type of peers and schedule a cleanup procedure to the head
// of the linked list (to the most deprecated peer). At the end of cleanup it schedules the next cleanup to the new
// head.
@@ -97,7 +112,9 @@ func (e *EphemeralManager) OnPeerConnected(ctx context.Context, peer *nbpeer.Pee
e.peersLock.Lock()
defer e.peersLock.Unlock()
e.removePeer(peer.ID)
if e.removePeer(peer.ID) {
e.metrics.DecPending(1)
}
// stop the unnecessary timer
if e.headPeer == nil && e.timer != nil {
@@ -123,6 +140,7 @@ func (e *EphemeralManager) OnPeerDisconnected(ctx context.Context, peer *nbpeer.
}
e.addPeer(peer.AccountID, peer.ID, e.newDeadLine())
e.metrics.IncPending()
if e.timer == nil {
delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow
if delay < 0 {
@@ -145,6 +163,7 @@ func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) {
for _, p := range peers {
e.addPeer(p.AccountID, p.ID, t)
}
e.metrics.AddPending(int64(len(peers)))
log.WithContext(ctx).Debugf("loaded ephemeral peer(s): %d", len(peers))
}
@@ -181,6 +200,15 @@ func (e *EphemeralManager) cleanup(ctx context.Context) {
e.peersLock.Unlock()
// Drop the gauge by the number of entries we just took off the list,
// regardless of whether the subsequent DeletePeers call succeeds. The
// list invariant is what the gauge tracks; failed delete batches are
// counted separately via CountCleanupError so we can still see them.
if len(deletePeers) > 0 {
e.metrics.CountCleanupRun()
e.metrics.DecPending(int64(len(deletePeers)))
}
peerIDsPerAccount := make(map[string][]string)
for id, p := range deletePeers {
peerIDsPerAccount[p.accountID] = append(peerIDsPerAccount[p.accountID], id)
@@ -191,7 +219,10 @@ func (e *EphemeralManager) cleanup(ctx context.Context) {
err := e.peersManager.DeletePeers(ctx, accountID, peerIDs, activity.SystemInitiator, true)
if err != nil {
log.WithContext(ctx).Errorf("failed to delete ephemeral peers: %s", err)
e.metrics.CountCleanupError()
continue
}
e.metrics.CountPeersCleaned(int64(len(peerIDs)))
}
}
@@ -211,9 +242,12 @@ func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline tim
e.tailPeer = ep
}
func (e *EphemeralManager) removePeer(id string) {
// removePeer drops the entry from the linked list. Returns true if a
// matching entry was found and removed so callers can keep the pending
// metric gauge in sync.
func (e *EphemeralManager) removePeer(id string) bool {
if e.headPeer == nil {
return
return false
}
if e.headPeer.id == id {
@@ -221,7 +255,7 @@ func (e *EphemeralManager) removePeer(id string) {
if e.tailPeer.id == id {
e.tailPeer = nil
}
return
return true
}
for p := e.headPeer; p.next != nil; p = p.next {
@@ -231,9 +265,10 @@ func (e *EphemeralManager) removePeer(id string) {
e.tailPeer = p
}
p.next = p.next.next
return
return true
}
}
return false
}
func (e *EphemeralManager) isPeerOnList(id string) bool {

View File

@@ -112,7 +112,11 @@ func (s *BaseServer) AuthManager() auth.Manager {
func (s *BaseServer) EphemeralManager() ephemeral.Manager {
return Create(s, func() ephemeral.Manager {
return manager.NewEphemeralManager(s.Store(), s.PeersManager())
em := manager.NewEphemeralManager(s.Store(), s.PeersManager())
if metrics := s.Metrics(); metrics != nil {
em.SetMetrics(metrics.EphemeralPeersMetrics())
}
return em
})
}

View File

@@ -6,9 +6,11 @@ import (
"net/netip"
"net/url"
"strings"
"time"
log "github.com/sirupsen/logrus"
goproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
integrationsConfig "github.com/netbirdio/management-integrations/integrations/config"
@@ -185,9 +187,38 @@ func ToSyncResponse(ctx context.Context, config *nbconfig.Config, httpConfig *nb
response.NetworkMap.SshAuth = &proto.SSHAuth{AuthorizedUsers: hashedUsers, MachineUsers: machineUsers, UserIDClaim: userIDClaim}
}
// settings == nil → field stays nil → "no info in this snapshot", client
// preserves the deadline it already had. settings non-nil → emit either a
// valid deadline or the explicit-zero "disabled" sentinel via
// encodeSessionExpiresAt.
if settings != nil {
response.SessionExpiresAt = encodeSessionExpiresAt(
peer.SessionExpiresAt(settings.PeerLoginExpirationEnabled, settings.PeerLoginExpiration),
)
}
return response
}
// encodeSessionExpiresAt encodes a server-side deadline into the 3-state wire
// representation used on LoginResponse, SyncResponse and
// ExtendAuthSessionResponse. See the proto comments on those messages.
//
// - deadline.IsZero() → returns &Timestamp{} (seconds=0, nanos=0): the
// "expiry disabled or peer is not SSO-tracked" sentinel; the client clears
// its anchor.
// - deadline non-zero → returns timestamppb.New(deadline): the new absolute
// UTC deadline.
//
// Returning nil ("no info, preserve client's anchor") is the caller's job —
// only meaningful on Sync builds where settings were not resolved.
func encodeSessionExpiresAt(deadline time.Time) *timestamppb.Timestamp {
if deadline.IsZero() {
return &timestamppb.Timestamp{}
}
return timestamppb.New(deadline)
}
func buildAuthorizedUsersProto(ctx context.Context, authorizedUsers map[string]map[string]struct{}) ([][]byte, map[string]*proto.MachineUserIndexes) {
userIDToIndex := make(map[string]uint32)
var hashedUsers [][]byte

View File

@@ -5,6 +5,7 @@ import (
"net/netip"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
@@ -200,3 +201,29 @@ func TestBuildJWTConfig_Audiences(t *testing.T) {
})
}
}
// TestEncodeSessionExpiresAt pins the wire encoding the client's
// applySessionDeadline depends on:
//
// - zero deadline → &Timestamp{} (seconds=0, nanos=0): the explicit
// "expiry disabled or peer is not SSO-tracked" sentinel.
// - non-zero → timestamppb.New(deadline): the absolute UTC deadline.
//
// The third state (nil pointer = "no info in this snapshot") is the caller's
// responsibility on the Sync path when settings could not be resolved; the
// helper itself never returns nil.
func TestEncodeSessionExpiresAt(t *testing.T) {
t.Run("zero deadline encodes as explicit-zero sentinel", func(t *testing.T) {
got := encodeSessionExpiresAt(time.Time{})
assert.NotNil(t, got, "must not return nil; nil means 'no info', not 'disabled'")
assert.Equal(t, int64(0), got.GetSeconds())
assert.Equal(t, int32(0), got.GetNanos())
})
t.Run("non-zero deadline round-trips", func(t *testing.T) {
deadline := time.Date(2030, 1, 2, 3, 4, 5, 0, time.UTC)
got := encodeSessionExpiresAt(deadline)
assert.NotNil(t, got)
assert.True(t, got.AsTime().Equal(deadline))
})
}

View File

@@ -522,10 +522,11 @@ func (s *Server) sendJob(ctx context.Context, peerKey wgtypes.Key, job *job.Even
}
func (s *Server) cancelPeerRoutines(ctx context.Context, accountID string, peer *nbpeer.Peer, streamStartTime time.Time) {
unlock := s.acquirePeerLockByUID(ctx, peer.Key)
uncanceledCTX := context.WithoutCancel(ctx)
unlock := s.acquirePeerLockByUID(uncanceledCTX, peer.Key)
defer unlock()
s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, streamStartTime)
s.cancelPeerRoutinesWithoutLock(uncanceledCTX, accountID, peer, streamStartTime)
}
func (s *Server) cancelPeerRoutinesWithoutLock(ctx context.Context, accountID string, peer *nbpeer.Peer, streamStartTime time.Time) {
@@ -820,6 +821,80 @@ func (s *Server) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto
}, nil
}
// ExtendAuthSession refreshes the peer's SSO session expiry deadline using a
// fresh JWT. The same JWT validation pipeline as Login is used. The tunnel
// stays up; no network map sync is performed. The new deadline is returned
// in ExtendAuthSessionResponse.SessionExpiresAt.
func (s *Server) ExtendAuthSession(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
extendReq := &proto.ExtendAuthSessionRequest{}
peerKey, err := s.parseRequest(ctx, req, extendReq)
if err != nil {
return nil, err
}
//nolint
ctx = context.WithValue(ctx, nbContext.PeerIDKey, peerKey.String())
if accountID, accErr := s.accountManager.GetAccountIDForPeerKey(ctx, peerKey.String()); accErr == nil {
//nolint
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
}
jwt := extendReq.GetJwtToken()
if jwt == "" {
return nil, status.Errorf(codes.InvalidArgument, "jwt token is required")
}
var userID string
const attempts = 3
for i := 0; i < attempts; i++ {
userID, err = s.validateToken(ctx, peerKey.String(), jwt)
if err == nil {
break
}
if i == attempts-1 {
break
}
log.WithContext(ctx).Warnf("failed validating JWT token while extending session for peer %s: %v. Retrying (idP cache).", peerKey.String(), err)
select {
case <-time.After(200 * time.Millisecond):
case <-ctx.Done():
return nil, ctx.Err()
}
}
if err != nil {
return nil, err
}
if userID == "" {
return nil, status.Errorf(codes.Unauthenticated, "jwt token did not yield a user id")
}
deadline, err := s.accountManager.ExtendPeerSession(ctx, peerKey.String(), userID)
if err != nil {
log.WithContext(ctx).Warnf("failed extending session for peer %s: %v", peerKey.String(), err)
return nil, mapError(ctx, err)
}
// Success path normally returns a non-zero deadline. A defensive zero
// would still encode as the explicit "disabled" sentinel rather than nil,
// so the client clears any stale anchor instead of preserving it.
resp := &proto.ExtendAuthSessionResponse{
SessionExpiresAt: encodeSessionExpiresAt(deadline),
}
wgKey, err := s.secretsManager.GetWGKey()
if err != nil {
return nil, status.Errorf(codes.Internal, "failed processing request")
}
encrypted, err := encryption.EncryptMessage(peerKey, wgKey, resp)
if err != nil {
return nil, status.Errorf(codes.Internal, "failed encrypting response")
}
return &proto.EncryptedMessage{
WgPubKey: wgKey.PublicKey().String(),
Body: encrypted,
}, nil
}
func (s *Server) prepareLoginResponse(ctx context.Context, peer *nbpeer.Peer, netMap *types.NetworkMap, postureChecks []*posture.Checks) (*proto.LoginResponse, error) {
var relayToken *Token
var err error
@@ -843,6 +918,12 @@ func (s *Server) prepareLoginResponse(ctx context.Context, peer *nbpeer.Peer, ne
Checks: toProtocolChecks(ctx, postureChecks),
}
// settings is always non-nil here, so we never emit nil — encoder returns
// either a valid deadline or the explicit-zero "disabled" sentinel.
loginResp.SessionExpiresAt = encodeSessionExpiresAt(
peer.SessionExpiresAt(settings.PeerLoginExpirationEnabled, settings.PeerLoginExpiration),
)
return loginResp, nil
}

View File

@@ -291,10 +291,15 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco
return nil, status.NewPermissionDeniedError()
}
// Canonicalize the incoming range so a caller-supplied prefix with host bits
// (e.g. 100.64.1.1/16) compares equal to the masked form stored on network.Net.
newSettings.NetworkRange = newSettings.NetworkRange.Masked()
var oldSettings *types.Settings
var updateAccountPeers bool
var groupChangesAffectPeers bool
var reloadReverseProxy bool
var effectiveOldNetworkRange netip.Prefix
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
var groupsUpdated bool
@@ -308,6 +313,16 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco
return err
}
// No lock: the transaction already holds Settings(Update), and network.Net is
// only mutated by reallocateAccountPeerIPs, which is reachable only through
// this same code path. A Share lock here would extend an unnecessary row lock
// and complicate ordering against updatePeerIPv6InTransaction.
network, err := transaction.GetAccountNetwork(ctx, store.LockingStrengthNone, accountID)
if err != nil {
return fmt.Errorf("get account network: %w", err)
}
effectiveOldNetworkRange = prefixFromIPNet(network.Net)
if oldSettings.Extra != nil && newSettings.Extra != nil &&
oldSettings.Extra.PeerApprovalEnabled && !newSettings.Extra.PeerApprovalEnabled {
approvedCount, err := transaction.ApproveAccountPeers(ctx, accountID)
@@ -321,7 +336,7 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco
}
}
if oldSettings.NetworkRange != newSettings.NetworkRange {
if newSettings.NetworkRange.IsValid() && newSettings.NetworkRange != effectiveOldNetworkRange {
if err = am.reallocateAccountPeerIPs(ctx, transaction, accountID, newSettings.NetworkRange); err != nil {
return err
}
@@ -340,7 +355,17 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco
oldSettings.LazyConnectionEnabled != newSettings.LazyConnectionEnabled ||
oldSettings.DNSDomain != newSettings.DNSDomain ||
oldSettings.AutoUpdateVersion != newSettings.AutoUpdateVersion ||
oldSettings.AutoUpdateAlways != newSettings.AutoUpdateAlways {
oldSettings.AutoUpdateAlways != newSettings.AutoUpdateAlways ||
oldSettings.PeerLoginExpirationEnabled != newSettings.PeerLoginExpirationEnabled ||
oldSettings.PeerLoginExpiration != newSettings.PeerLoginExpiration {
// Session deadline is derived from LastLogin + PeerLoginExpiration
// on every Login/Sync response. Without a fan-out push, connected
// peers keep the deadline they received at login time and only see
// the new value after the next unrelated NetworkMap change. Add
// these two fields to the trigger list so admin-side expiry tweaks
// (e.g. shortening from 24h to 1h) reach every connected peer
// within seconds, which is what the proactive-warning feature
// relies on (see client/internal/auth/sessionwatch).
updateAccountPeers = true
}
@@ -396,9 +421,9 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco
}
am.StoreEvent(ctx, userID, accountID, accountID, activity.AccountDNSDomainUpdated, eventMeta)
}
if oldSettings.NetworkRange != newSettings.NetworkRange {
if newSettings.NetworkRange.IsValid() && newSettings.NetworkRange != effectiveOldNetworkRange {
eventMeta := map[string]any{
"old_network_range": oldSettings.NetworkRange.String(),
"old_network_range": effectiveOldNetworkRange.String(),
"new_network_range": newSettings.NetworkRange.String(),
}
am.StoreEvent(ctx, userID, accountID, accountID, activity.AccountNetworkRangeUpdated, eventMeta)
@@ -443,6 +468,22 @@ func ipv6SettingsChanged(old, updated *types.Settings) bool {
return !slices.Equal(oldGroups, newGroups)
}
// prefixFromIPNet returns the overlay prefix actually allocated on the account
// network, or an invalid prefix if none is set. Settings.NetworkRange is a
// user-facing override that is empty on legacy accounts, so the effective
// range must be read from network.Net to compare against an incoming update.
func prefixFromIPNet(ipNet net.IPNet) netip.Prefix {
if ipNet.IP == nil {
return netip.Prefix{}
}
addr, ok := netip.AddrFromSlice(ipNet.IP)
if !ok {
return netip.Prefix{}
}
ones, _ := ipNet.Mask.Size()
return netip.PrefixFrom(addr.Unmap(), ones)
}
func (am *DefaultAccountManager) validateSettingsUpdate(ctx context.Context, transaction store.Store, newSettings, oldSettings *types.Settings, userID, accountID string) error {
halfYearLimit := 180 * 24 * time.Hour
if newSettings.PeerLoginExpiration > halfYearLimit {
@@ -1837,35 +1878,32 @@ func domainIsUpToDate(domain string, domainCategory string, userAuth auth.UserAu
return domainCategory == types.PrivateCategory || userAuth.DomainCategory != types.PrivateCategory || domain != userAuth.Domain
}
// SyncAndMarkPeer is the per-Sync entry point: it refreshes the peer's
// network map and then marks the peer connected with a session token
// derived from syncTime (the moment the gRPC stream opened). Any
// concurrent stream that started earlier loses the optimistic-lock race
// in MarkPeerConnected and bails without writing.
func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, syncTime time.Time) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) {
peer, netMap, postureChecks, dnsfwdPort, err := am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, accountID)
if err != nil {
return nil, nil, nil, 0, fmt.Errorf("error syncing peer: %w", err)
}
err = am.MarkPeerConnected(ctx, peerPubKey, true, realIP, accountID, syncTime)
if err != nil {
if err := am.MarkPeerConnected(ctx, peerPubKey, realIP, accountID, syncTime.UnixNano()); err != nil {
log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err)
}
return peer, netMap, postureChecks, dnsfwdPort, nil
}
// OnPeerDisconnected is invoked when a sync stream ends. It marks the
// peer disconnected only when the stored SessionStartedAt matches the
// nanosecond token derived from streamStartTime — i.e. only when this
// is the stream that currently owns the peer's session. A mismatch
// means a newer stream has already replaced us, so the disconnect is
// dropped.
func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string, streamStartTime time.Time) error {
peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerPubKey)
if err != nil {
log.WithContext(ctx).Warnf("failed to get peer %s for disconnect check: %v", peerPubKey, err)
return nil
}
if peer.Status.LastSeen.After(streamStartTime) {
log.WithContext(ctx).Tracef("peer %s has newer activity (lastSeen=%s > streamStart=%s), skipping disconnect",
peerPubKey, peer.Status.LastSeen.Format(time.RFC3339), streamStartTime.Format(time.RFC3339))
return nil
}
err = am.MarkPeerConnected(ctx, peerPubKey, false, nil, accountID, time.Now().UTC())
if err != nil {
if err := am.MarkPeerDisconnected(ctx, peerPubKey, accountID, streamStartTime.UnixNano()); err != nil {
log.WithContext(ctx).Warnf("failed marking peer as disconnected %s %v", peerPubKey, err)
}
return nil

View File

@@ -61,7 +61,8 @@ type Manager interface {
GetUserFromUserAuth(ctx context.Context, userAuth auth.UserAuth) (*types.User, error)
ListUsers(ctx context.Context, accountID string) ([]*types.User, error)
GetPeers(ctx context.Context, accountID, userID, nameFilter, ipFilter string) ([]*nbpeer.Peer, error)
MarkPeerConnected(ctx context.Context, peerKey string, connected bool, realIP net.IP, accountID string, syncTime time.Time) error
MarkPeerConnected(ctx context.Context, peerKey string, realIP net.IP, accountID string, sessionStartedAt int64) error
MarkPeerDisconnected(ctx context.Context, peerKey string, accountID string, sessionStartedAt int64) error
DeletePeer(ctx context.Context, accountID, peerID, userID string) error
UpdatePeer(ctx context.Context, accountID, userID string, p *nbpeer.Peer) (*nbpeer.Peer, error)
UpdatePeerIP(ctx context.Context, accountID, userID, peerID string, newIP netip.Addr) error
@@ -108,6 +109,7 @@ type Manager interface {
UpdateAccountSettings(ctx context.Context, accountID, userID string, newSettings *types.Settings) (*types.Settings, error)
UpdateAccountOnboarding(ctx context.Context, accountID, userID string, newOnboarding *types.AccountOnboarding) (*types.AccountOnboarding, error)
LoginPeer(ctx context.Context, login types.PeerLogin) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) // used by peer gRPC API
ExtendPeerSession(ctx context.Context, peerPubKey, userID string) (time.Time, error) // used by peer gRPC API for ExtendAuthSession
SyncPeer(ctx context.Context, sync types.PeerSync, accountID string) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) // used by peer gRPC API
GetExternalCacheManager() ExternalCacheManager
GetPostureChecks(ctx context.Context, accountID, postureChecksID, userID string) (*posture.Checks, error)

View File

@@ -1304,18 +1304,47 @@ func (mr *MockManagerMockRecorder) LoginPeer(ctx, login interface{}) *gomock.Cal
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoginPeer", reflect.TypeOf((*MockManager)(nil).LoginPeer), ctx, login)
}
// MarkPeerConnected mocks base method.
func (m *MockManager) MarkPeerConnected(ctx context.Context, peerKey string, connected bool, realIP net.IP, accountID string, syncTime time.Time) error {
// ExtendPeerSession mocks base method.
func (m *MockManager) ExtendPeerSession(ctx context.Context, peerPubKey, userID string) (time.Time, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "MarkPeerConnected", ctx, peerKey, connected, realIP, accountID, syncTime)
ret := m.ctrl.Call(m, "ExtendPeerSession", ctx, peerPubKey, userID)
ret0, _ := ret[0].(time.Time)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ExtendPeerSession indicates an expected call of ExtendPeerSession.
func (mr *MockManagerMockRecorder) ExtendPeerSession(ctx, peerPubKey, userID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExtendPeerSession", reflect.TypeOf((*MockManager)(nil).ExtendPeerSession), ctx, peerPubKey, userID)
}
// MarkPeerConnected mocks base method.
func (m *MockManager) MarkPeerConnected(ctx context.Context, peerKey string, realIP net.IP, accountID string, sessionStartedAt int64) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "MarkPeerConnected", ctx, peerKey, realIP, accountID, sessionStartedAt)
ret0, _ := ret[0].(error)
return ret0
}
// MarkPeerConnected indicates an expected call of MarkPeerConnected.
func (mr *MockManagerMockRecorder) MarkPeerConnected(ctx, peerKey, connected, realIP, accountID, syncTime interface{}) *gomock.Call {
func (mr *MockManagerMockRecorder) MarkPeerConnected(ctx, peerKey, realIP, accountID, sessionStartedAt interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkPeerConnected", reflect.TypeOf((*MockManager)(nil).MarkPeerConnected), ctx, peerKey, connected, realIP, accountID, syncTime)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkPeerConnected", reflect.TypeOf((*MockManager)(nil).MarkPeerConnected), ctx, peerKey, realIP, accountID, sessionStartedAt)
}
// MarkPeerDisconnected mocks base method.
func (m *MockManager) MarkPeerDisconnected(ctx context.Context, peerKey string, accountID string, sessionStartedAt int64) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "MarkPeerDisconnected", ctx, peerKey, accountID, sessionStartedAt)
ret0, _ := ret[0].(error)
return ret0
}
// MarkPeerDisconnected indicates an expected call of MarkPeerDisconnected.
func (mr *MockManagerMockRecorder) MarkPeerDisconnected(ctx, peerKey, accountID, sessionStartedAt interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkPeerDisconnected", reflect.TypeOf((*MockManager)(nil).MarkPeerDisconnected), ctx, peerKey, accountID, sessionStartedAt)
}
// OnPeerDisconnected mocks base method.

View File

@@ -1813,7 +1813,7 @@ func TestDefaultAccountManager_UpdatePeer_PeerLoginExpiration(t *testing.T) {
accountID, err := manager.GetAccountIDByUserID(context.Background(), auth.UserAuth{UserId: userID})
require.NoError(t, err, "unable to get the account")
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), true, nil, accountID, time.Now().UTC())
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), nil, accountID, time.Now().UTC().UnixNano())
require.NoError(t, err, "unable to mark peer connected")
_, err = manager.UpdateAccountSettings(context.Background(), accountID, userID, &types.Settings{
@@ -1884,7 +1884,7 @@ func TestDefaultAccountManager_MarkPeerConnected_PeerLoginExpiration(t *testing.
require.NoError(t, err, "unable to get the account")
// when we mark peer as connected, the peer login expiration routine should trigger
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), true, nil, accountID, time.Now().UTC())
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), nil, accountID, time.Now().UTC().UnixNano())
require.NoError(t, err, "unable to mark peer connected")
failed := waitTimeout(wg, time.Second)
@@ -1910,15 +1910,16 @@ func TestDefaultAccountManager_OnPeerDisconnected_LastSeenCheck(t *testing.T) {
}, false)
require.NoError(t, err, "unable to add peer")
t.Run("disconnect peer when streamStartTime is after LastSeen", func(t *testing.T) {
err = manager.MarkPeerConnected(context.Background(), peerPubKey, true, nil, accountID, time.Now().UTC())
t.Run("disconnect peer when session token matches", func(t *testing.T) {
streamStartTime := time.Now().UTC()
err = manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, streamStartTime.UnixNano())
require.NoError(t, err, "unable to mark peer connected")
peer, err := manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
require.NoError(t, err, "unable to get peer")
require.True(t, peer.Status.Connected, "peer should be connected")
streamStartTime := time.Now().UTC()
require.Equal(t, streamStartTime.UnixNano(), peer.Status.SessionStartedAt,
"SessionStartedAt should equal the token we passed in")
err = manager.OnPeerDisconnected(context.Background(), accountID, peerPubKey, streamStartTime)
require.NoError(t, err)
@@ -1926,49 +1927,127 @@ func TestDefaultAccountManager_OnPeerDisconnected_LastSeenCheck(t *testing.T) {
peer, err = manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
require.NoError(t, err)
require.False(t, peer.Status.Connected, "peer should be disconnected")
require.Equal(t, int64(0), peer.Status.SessionStartedAt, "SessionStartedAt should be reset to 0")
})
t.Run("skip disconnect when LastSeen is after streamStartTime (zombie stream protection)", func(t *testing.T) {
err = manager.MarkPeerConnected(context.Background(), peerPubKey, true, nil, accountID, time.Now().UTC())
t.Run("skip disconnect when stored session is newer (zombie stream protection)", func(t *testing.T) {
// Newer stream wins on connect (sets SessionStartedAt = now ns).
streamStartTime := time.Now().UTC()
err = manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, streamStartTime.UnixNano())
require.NoError(t, err, "unable to mark peer connected")
peer, err := manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
require.NoError(t, err)
require.True(t, peer.Status.Connected, "peer should be connected")
streamStartTime := peer.Status.LastSeen.Add(-1 * time.Hour)
// Older stream tries to mark disconnect with its own (older) session token —
// fencing kicks in and the write is dropped.
staleStreamStartTime := streamStartTime.Add(-1 * time.Hour)
err = manager.OnPeerDisconnected(context.Background(), accountID, peerPubKey, streamStartTime)
err = manager.OnPeerDisconnected(context.Background(), accountID, peerPubKey, staleStreamStartTime)
require.NoError(t, err)
peer, err = manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
require.NoError(t, err)
require.True(t, peer.Status.Connected,
"peer should remain connected because LastSeen > streamStartTime (zombie stream protection)")
"peer should remain connected because the stored session is newer than the disconnect token")
require.Equal(t, streamStartTime.UnixNano(), peer.Status.SessionStartedAt,
"SessionStartedAt should still hold the winning stream's token")
})
t.Run("skip stale connect when peer already has newer LastSeen (blocked goroutine protection)", func(t *testing.T) {
t.Run("skip stale connect when stored session is newer (blocked goroutine protection)", func(t *testing.T) {
node2SyncTime := time.Now().UTC()
err = manager.MarkPeerConnected(context.Background(), peerPubKey, true, nil, accountID, node2SyncTime)
err = manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, node2SyncTime.UnixNano())
require.NoError(t, err, "node 2 should connect peer")
peer, err := manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
require.NoError(t, err)
require.True(t, peer.Status.Connected, "peer should be connected")
require.Equal(t, node2SyncTime.Unix(), peer.Status.LastSeen.Unix(), "LastSeen should be node2SyncTime")
require.Equal(t, node2SyncTime.UnixNano(), peer.Status.SessionStartedAt,
"SessionStartedAt should equal node2SyncTime token")
node1StaleSyncTime := node2SyncTime.Add(-1 * time.Minute)
err = manager.MarkPeerConnected(context.Background(), peerPubKey, true, nil, accountID, node1StaleSyncTime)
err = manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, node1StaleSyncTime.UnixNano())
require.NoError(t, err, "stale connect should not return error")
peer, err = manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
require.NoError(t, err)
require.True(t, peer.Status.Connected, "peer should still be connected")
require.Equal(t, node2SyncTime.Unix(), peer.Status.LastSeen.Unix(),
"LastSeen should NOT be overwritten by stale syncTime from blocked goroutine")
require.Equal(t, node2SyncTime.UnixNano(), peer.Status.SessionStartedAt,
"SessionStartedAt should NOT be overwritten by stale token from blocked goroutine")
})
}
// TestDefaultAccountManager_MarkPeerConnected_ConcurrentRace exercises the
// fencing protocol under contention: many goroutines race to mark the
// same peer connected with distinct session tokens at the same time.
// The contract is that the highest token always wins and is what remains
// in the store, regardless of execution order.
func TestDefaultAccountManager_MarkPeerConnected_ConcurrentRace(t *testing.T) {
manager, _, err := createManager(t)
require.NoError(t, err, "unable to create account manager")
accountID, err := manager.GetAccountIDByUserID(context.Background(), auth.UserAuth{UserId: userID})
require.NoError(t, err, "unable to get account")
key, err := wgtypes.GenerateKey()
require.NoError(t, err, "unable to generate WireGuard key")
peerPubKey := key.PublicKey().String()
_, _, _, err = manager.AddPeer(context.Background(), "", "", userID, &nbpeer.Peer{
Key: peerPubKey,
Meta: nbpeer.PeerSystemMeta{Hostname: "race-peer"},
}, false)
require.NoError(t, err, "unable to add peer")
const workers = 16
base := time.Now().UTC().UnixNano()
tokens := make([]int64, workers)
for i := range tokens {
// Spread tokens by 1ms so the comparison is unambiguous; the
// largest is index workers-1.
tokens[i] = base + int64(i)*int64(time.Millisecond)
}
expected := tokens[workers-1]
var ready sync.WaitGroup
ready.Add(workers)
var start sync.WaitGroup
start.Add(1)
var done sync.WaitGroup
done.Add(workers)
// require.* calls t.FailNow which is documented as unsafe from
// non-test goroutines (it calls runtime.Goexit on the wrong stack and
// races with the WaitGroup). Collect errors here and assert from the
// main goroutine after done.Wait().
errs := make(chan error, workers)
for i := 0; i < workers; i++ {
token := tokens[i]
go func() {
defer done.Done()
ready.Done()
start.Wait()
errs <- manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, token)
}()
}
ready.Wait()
start.Done()
done.Wait()
close(errs)
for err := range errs {
require.NoError(t, err, "MarkPeerConnected must not error under contention")
}
peer, err := manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
require.NoError(t, err)
require.True(t, peer.Status.Connected, "peer should be connected after the race")
require.Equal(t, expected, peer.Status.SessionStartedAt,
"the largest token must win regardless of execution order")
}
func TestDefaultAccountManager_UpdateAccountSettings_PeerLoginExpiration(t *testing.T) {
manager, _, err := createManager(t)
require.NoError(t, err, "unable to create account manager")
@@ -1991,7 +2070,7 @@ func TestDefaultAccountManager_UpdateAccountSettings_PeerLoginExpiration(t *test
account, err := manager.Store.GetAccount(context.Background(), accountID)
require.NoError(t, err, "unable to get the account")
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), true, nil, accountID, time.Now().UTC())
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), nil, accountID, time.Now().UTC().UnixNano())
require.NoError(t, err, "unable to mark peer connected")
wg := &sync.WaitGroup{}
@@ -3970,6 +4049,96 @@ func TestDefaultAccountManager_UpdateAccountSettings_NetworkRangeChange(t *testi
}
}
// TestDefaultAccountManager_UpdateAccountSettings_NetworkRangePreserved guards against
// peer IP reallocation when a settings update carries the network range that is already
// in use. Legacy accounts have Settings.NetworkRange unset in the DB while network.Net
// holds the actual allocated overlay; the dashboard backfills the GET response from
// network.Net and echoes the value back on PUT, so the diff must be against the
// effective range to avoid renumbering every peer on an unrelated settings change.
func TestDefaultAccountManager_UpdateAccountSettings_NetworkRangePreserved(t *testing.T) {
manager, _, account, peer1, peer2, peer3 := setupNetworkMapTest(t)
ctx := context.Background()
settings, err := manager.Store.GetAccountSettings(ctx, store.LockingStrengthNone, account.Id)
require.NoError(t, err)
require.False(t, settings.NetworkRange.IsValid(), "precondition: new accounts leave Settings.NetworkRange unset")
network, err := manager.Store.GetAccountNetwork(ctx, store.LockingStrengthNone, account.Id)
require.NoError(t, err)
require.NotNil(t, network.Net.IP, "precondition: network.Net should be allocated")
addr, ok := netip.AddrFromSlice(network.Net.IP)
require.True(t, ok)
ones, _ := network.Net.Mask.Size()
effective := netip.PrefixFrom(addr.Unmap(), ones)
require.True(t, effective.IsValid())
before := map[string]netip.Addr{peer1.ID: peer1.IP, peer2.ID: peer2.IP, peer3.ID: peer3.IP}
// Round-trip the effective range as if the dashboard echoed back the GET-backfilled value.
_, err = manager.UpdateAccountSettings(ctx, account.Id, userID, &types.Settings{
PeerLoginExpirationEnabled: true,
PeerLoginExpiration: types.DefaultPeerLoginExpiration,
NetworkRange: effective,
Extra: &types.ExtraSettings{},
})
require.NoError(t, err)
peers, err := manager.Store.GetAccountPeers(ctx, store.LockingStrengthNone, account.Id, "", "")
require.NoError(t, err)
require.Len(t, peers, len(before))
for _, p := range peers {
assert.Equal(t, before[p.ID], p.IP, "peer %s IP should not change when range matches effective", p.ID)
}
// Carrying the same range with host bits set must also be a no-op once canonicalized.
hostBitsForm := netip.PrefixFrom(peer1.IP, ones)
require.NotEqual(t, effective, hostBitsForm, "precondition: host-bit form should differ before masking")
_, err = manager.UpdateAccountSettings(ctx, account.Id, userID, &types.Settings{
PeerLoginExpirationEnabled: true,
PeerLoginExpiration: types.DefaultPeerLoginExpiration,
NetworkRange: hostBitsForm,
Extra: &types.ExtraSettings{},
})
require.NoError(t, err)
peers, err = manager.Store.GetAccountPeers(ctx, store.LockingStrengthNone, account.Id, "", "")
require.NoError(t, err)
for _, p := range peers {
assert.Equal(t, before[p.ID], p.IP, "peer %s IP should not change for host-bit-set equivalent range", p.ID)
}
// Omitting NetworkRange (invalid prefix) must also be a no-op.
_, err = manager.UpdateAccountSettings(ctx, account.Id, userID, &types.Settings{
PeerLoginExpirationEnabled: true,
PeerLoginExpiration: types.DefaultPeerLoginExpiration,
Extra: &types.ExtraSettings{},
})
require.NoError(t, err)
peers, err = manager.Store.GetAccountPeers(ctx, store.LockingStrengthNone, account.Id, "", "")
require.NoError(t, err)
for _, p := range peers {
assert.Equal(t, before[p.ID], p.IP, "peer %s IP should not change when NetworkRange omitted", p.ID)
}
// Sanity: an actually different range still triggers reallocation.
newRange := netip.MustParsePrefix("100.99.0.0/16")
_, err = manager.UpdateAccountSettings(ctx, account.Id, userID, &types.Settings{
PeerLoginExpirationEnabled: true,
PeerLoginExpiration: types.DefaultPeerLoginExpiration,
NetworkRange: newRange,
Extra: &types.ExtraSettings{},
})
require.NoError(t, err)
peers, err = manager.Store.GetAccountPeers(ctx, store.LockingStrengthNone, account.Id, "", "")
require.NoError(t, err)
for _, p := range peers {
assert.True(t, newRange.Contains(p.IP), "peer %s should be in new range %s, got %s", p.ID, newRange, p.IP)
assert.NotEqual(t, before[p.ID], p.IP, "peer %s IP should change on real range update", p.ID)
}
}
func TestDefaultAccountManager_UpdateAccountSettings_IPv6EnabledGroups(t *testing.T) {
manager, _, account, peer1, peer2, peer3 := setupNetworkMapTest(t)
ctx := context.Background()

View File

@@ -240,6 +240,10 @@ const (
AccountLocalMfaEnabled Activity = 123
// AccountLocalMfaDisabled indicates that a user disabled TOTP MFA for local users
AccountLocalMfaDisabled Activity = 124
// UserExtendedPeerSession indicates that a user refreshed their peer's
// SSO session deadline via ExtendAuthSession without re-establishing the
// tunnel. Distinct from UserLoggedInPeer (full interactive login).
UserExtendedPeerSession Activity = 125
AccountDeleted Activity = 99999
)
@@ -394,6 +398,8 @@ var activityMap = map[Activity]Code{
AccountLocalMfaEnabled: {"Account local MFA enabled", "account.setting.local.mfa.enable"},
AccountLocalMfaDisabled: {"Account local MFA disabled", "account.setting.local.mfa.disable"},
UserExtendedPeerSession: {"User extended peer session", "user.peer.session.extend"},
DomainAdded: {"Domain added", "domain.add"},
DomainDeleted: {"Domain deleted", "domain.delete"},
DomainValidated: {"Domain validated", "domain.validate"},

View File

@@ -6,7 +6,9 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"os"
"path"
"strings"
"github.com/dexidp/dex/storage"
@@ -138,10 +140,13 @@ func (c *EmbeddedIdPConfig) ToYAMLConfig() (*dex.YAMLConfig, error) {
return nil, fmt.Errorf("invalid IdP storage config: %w", err)
}
// Build CLI redirect URIs including the device callback (both relative and absolute)
// Build CLI redirect URIs including the device callback. Dex uses the issuer-relative
// path (for example, /oauth2/device/callback) when completing the device flow, so
// include it explicitly in addition to the legacy bare path and absolute URL.
cliRedirectURIs := c.CLIRedirectURIs
cliRedirectURIs = append(cliRedirectURIs, "/device/callback")
cliRedirectURIs = append(cliRedirectURIs, c.Issuer+"/device/callback")
cliRedirectURIs = append(cliRedirectURIs, issuerRelativeDeviceCallback(c.Issuer))
cliRedirectURIs = append(cliRedirectURIs, strings.TrimSuffix(c.Issuer, "/")+"/device/callback")
// Build dashboard redirect URIs including the OAuth callback for proxy authentication
dashboardRedirectURIs := c.DashboardRedirectURIs
@@ -154,6 +159,10 @@ func (c *EmbeddedIdPConfig) ToYAMLConfig() (*dex.YAMLConfig, error) {
// MGMT api and the dashboard, adding baseURL means less configuration for the instance admin
dashboardPostLogoutRedirectURIs = append(dashboardPostLogoutRedirectURIs, baseURL)
redirectURIs := make([]string, 0)
redirectURIs = append(redirectURIs, cliRedirectURIs...)
redirectURIs = append(redirectURIs, dashboardRedirectURIs...)
cfg := &dex.YAMLConfig{
Issuer: c.Issuer,
Storage: dex.Storage{
@@ -179,14 +188,14 @@ func (c *EmbeddedIdPConfig) ToYAMLConfig() (*dex.YAMLConfig, error) {
ID: staticClientDashboard,
Name: "NetBird Dashboard",
Public: true,
RedirectURIs: dashboardRedirectURIs,
RedirectURIs: redirectURIs,
PostLogoutRedirectURIs: sanitizePostLogoutRedirectURIs(dashboardPostLogoutRedirectURIs),
},
{
ID: staticClientCLI,
Name: "NetBird CLI",
Public: true,
RedirectURIs: cliRedirectURIs,
RedirectURIs: redirectURIs,
},
},
StaticConnectors: c.StaticConnectors,
@@ -217,6 +226,14 @@ func (c *EmbeddedIdPConfig) ToYAMLConfig() (*dex.YAMLConfig, error) {
return cfg, nil
}
func issuerRelativeDeviceCallback(issuer string) string {
u, err := url.Parse(issuer)
if err != nil || u.Path == "" {
return "/device/callback"
}
return path.Join(u.Path, "/device/callback")
}
// Due to how the frontend generates the logout, sometimes it appends a trailing slash
// and because Dex only allows exact matches, we need to make sure we always have both
// versions of each provided uri
@@ -299,7 +316,7 @@ func resolveSessionCookieEncryptionKey(configuredKey string) (string, error) {
}
}
return "", fmt.Errorf("invalid embedded IdP session cookie encryption key: %s (or sessionCookieEncryptionKey) must be 16, 24, or 32 bytes as a raw string or base64-encoded to one of those lengths; got %d raw bytes", sessionCookieEncryptionKeyEnv, len([]byte(key)))
return "", fmt.Errorf("invalid embedded IdP session cookie encryption key:%s (or sessionCookieEncryptionKey) must be 16, 24, or 32 bytes as a raw string or base64-encoded to one of those lengths; got %d raw bytes", sessionCookieEncryptionKeyEnv, len([]byte(key)))
}
func validSessionCookieEncryptionKeyLength(length int) bool {

View File

@@ -314,6 +314,34 @@ func TestEmbeddedIdPManager_UpdateUserPassword(t *testing.T) {
})
}
func TestEmbeddedIdPConfig_ToYAMLConfig_IncludesDeviceCallbackRedirectURI(t *testing.T) {
config := &EmbeddedIdPConfig{
Enabled: true,
Issuer: "https://example.com/oauth2",
Storage: EmbeddedStorageConfig{
Type: "sqlite3",
Config: EmbeddedStorageTypeConfig{
File: filepath.Join(t.TempDir(), "dex.db"),
},
},
}
yamlConfig, err := config.ToYAMLConfig()
require.NoError(t, err)
var cliRedirectURIs []string
for _, client := range yamlConfig.StaticClients {
if client.ID == staticClientCLI {
cliRedirectURIs = client.RedirectURIs
break
}
}
require.NotEmpty(t, cliRedirectURIs)
assert.Contains(t, cliRedirectURIs, "/device/callback")
assert.Contains(t, cliRedirectURIs, "/oauth2/device/callback")
assert.Contains(t, cliRedirectURIs, "https://example.com/oauth2/device/callback")
}
func TestEmbeddedIdPConfig_ToYAMLConfig_SessionCookieEncryptionKey(t *testing.T) {
t.Setenv(sessionCookieEncryptionKeyEnv, "")

View File

@@ -38,7 +38,8 @@ type MockAccountManager struct {
GetUserFromUserAuthFunc func(ctx context.Context, userAuth auth.UserAuth) (*types.User, error)
ListUsersFunc func(ctx context.Context, accountID string) ([]*types.User, error)
GetPeersFunc func(ctx context.Context, accountID, userID, nameFilter, ipFilter string) ([]*nbpeer.Peer, error)
MarkPeerConnectedFunc func(ctx context.Context, peerKey string, connected bool, realIP net.IP, syncTime time.Time) error
MarkPeerConnectedFunc func(ctx context.Context, peerKey string, realIP net.IP, accountID string, sessionStartedAt int64) error
MarkPeerDisconnectedFunc func(ctx context.Context, peerKey string, accountID string, sessionStartedAt int64) error
SyncAndMarkPeerFunc func(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, syncTime time.Time) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error)
DeletePeerFunc func(ctx context.Context, accountID, peerKey, userID string) error
GetNetworkMapFunc func(ctx context.Context, peerKey string) (*types.NetworkMap, error)
@@ -97,6 +98,7 @@ type MockAccountManager struct {
GetPeerFunc func(ctx context.Context, accountID, peerID, userID string) (*nbpeer.Peer, error)
UpdateAccountSettingsFunc func(ctx context.Context, accountID, userID string, newSettings *types.Settings) (*types.Settings, error)
LoginPeerFunc func(ctx context.Context, login types.PeerLogin) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error)
ExtendPeerSessionFunc func(ctx context.Context, peerPubKey, userID string) (time.Time, error)
SyncPeerFunc func(ctx context.Context, sync types.PeerSync, accountID string) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error)
InviteUserFunc func(ctx context.Context, accountID string, initiatorUserID string, targetUserEmail string) error
ApproveUserFunc func(ctx context.Context, accountID, initiatorUserID, targetUserID string) (*types.UserInfo, error)
@@ -227,7 +229,14 @@ func (am *MockAccountManager) SyncAndMarkPeer(ctx context.Context, accountID str
return nil, nil, nil, 0, status.Errorf(codes.Unimplemented, "method MarkPeerConnected is not implemented")
}
func (am *MockAccountManager) OnPeerDisconnected(_ context.Context, accountID string, peerPubKey string, streamStartTime time.Time) error {
func (am *MockAccountManager) OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string, streamStartTime time.Time) error {
// Mirror DefaultAccountManager.OnPeerDisconnected: drive the fencing
// hook so tests that inject MarkPeerDisconnectedFunc actually observe
// disconnect events. Falls through to nil when no hook is set, which
// is the original behaviour.
if am.MarkPeerDisconnectedFunc != nil {
return am.MarkPeerDisconnectedFunc(ctx, peerPubKey, accountID, streamStartTime.UnixNano())
}
return nil
}
@@ -328,13 +337,21 @@ func (am *MockAccountManager) GetAccountIDByUserID(ctx context.Context, userAuth
}
// MarkPeerConnected mock implementation of MarkPeerConnected from server.AccountManager interface
func (am *MockAccountManager) MarkPeerConnected(ctx context.Context, peerKey string, connected bool, realIP net.IP, accountID string, syncTime time.Time) error {
func (am *MockAccountManager) MarkPeerConnected(ctx context.Context, peerKey string, realIP net.IP, accountID string, sessionStartedAt int64) error {
if am.MarkPeerConnectedFunc != nil {
return am.MarkPeerConnectedFunc(ctx, peerKey, connected, realIP, syncTime)
return am.MarkPeerConnectedFunc(ctx, peerKey, realIP, accountID, sessionStartedAt)
}
return status.Errorf(codes.Unimplemented, "method MarkPeerConnected is not implemented")
}
// MarkPeerDisconnected mock implementation of MarkPeerDisconnected from server.AccountManager interface
func (am *MockAccountManager) MarkPeerDisconnected(ctx context.Context, peerKey string, accountID string, sessionStartedAt int64) error {
if am.MarkPeerDisconnectedFunc != nil {
return am.MarkPeerDisconnectedFunc(ctx, peerKey, accountID, sessionStartedAt)
}
return status.Errorf(codes.Unimplemented, "method MarkPeerDisconnected is not implemented")
}
// DeleteAccount mock implementation of DeleteAccount from server.AccountManager interface
func (am *MockAccountManager) DeleteAccount(ctx context.Context, accountID, userID string) error {
if am.DeleteAccountFunc != nil {
@@ -844,6 +861,14 @@ func (am *MockAccountManager) LoginPeer(ctx context.Context, login types.PeerLog
return nil, nil, nil, status.Errorf(codes.Unimplemented, "method LoginPeer is not implemented")
}
// ExtendPeerSession mocks ExtendPeerSession of the AccountManager interface
func (am *MockAccountManager) ExtendPeerSession(ctx context.Context, peerPubKey, userID string) (time.Time, error) {
if am.ExtendPeerSessionFunc != nil {
return am.ExtendPeerSessionFunc(ctx, peerPubKey, userID)
}
return time.Time{}, status.Errorf(codes.Unimplemented, "method ExtendPeerSession is not implemented")
}
// SyncPeer mocks SyncPeer of the AccountManager interface
func (am *MockAccountManager) SyncPeer(ctx context.Context, sync types.PeerSync, accountID string) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) {
if am.SyncPeerFunc != nil {

View File

@@ -16,7 +16,6 @@ import (
"golang.org/x/exp/maps"
nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/management/server/geolocation"
"github.com/netbirdio/netbird/management/server/idp"
routerTypes "github.com/netbirdio/netbird/management/server/networks/routers/types"
"github.com/netbirdio/netbird/management/server/permissions/modules"
@@ -29,6 +28,7 @@ import (
"github.com/netbirdio/netbird/management/server/activity"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/shared/management/status"
)
@@ -63,56 +63,64 @@ func (am *DefaultAccountManager) GetPeers(ctx context.Context, accountID, userID
return am.Store.GetUserPeers(ctx, store.LockingStrengthNone, accountID, userID)
}
// MarkPeerConnected marks peer as connected (true) or disconnected (false)
// syncTime is used as the LastSeen timestamp and for stale request detection
func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, connected bool, realIP net.IP, accountID string, syncTime time.Time) error {
var peer *nbpeer.Peer
var settings *types.Settings
var expired bool
var err error
var skipped bool
// MarkPeerConnected marks a peer as connected with optimistic-locked
// fencing on PeerStatus.SessionStartedAt. The sessionStartedAt argument
// is the start time of the gRPC sync stream that owns this update,
// expressed as Unix nanoseconds — only the call whose token is greater
// than what's stored wins. LastSeen is written by the database itself;
// we never pass it down.
//
// Disconnects use MarkPeerDisconnected and require the session to match
// exactly; see PeerStatus.SessionStartedAt for the protocol.
func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, realIP net.IP, accountID string, sessionStartedAt int64) error {
start := time.Now()
defer func() {
am.metrics.AccountManagerMetrics().RecordPeerStatusUpdateDuration(telemetry.PeerStatusConnect, time.Since(start))
}()
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
peer, err = transaction.GetPeerByPeerPubKey(ctx, store.LockingStrengthUpdate, peerPubKey)
if err != nil {
return err
peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerPubKey)
if err != nil {
outcome := telemetry.PeerStatusError
if s, ok := status.FromError(err); ok && s.Type() == status.NotFound {
outcome = telemetry.PeerStatusPeerNotFound
}
if connected && !syncTime.After(peer.Status.LastSeen) {
log.WithContext(ctx).Tracef("peer %s has newer activity (lastSeen=%s >= syncTime=%s), skipping connect",
peer.ID, peer.Status.LastSeen.Format(time.RFC3339), syncTime.Format(time.RFC3339))
skipped = true
return nil
}
expired, err = updatePeerStatusAndLocation(ctx, am.geo, transaction, peer, connected, realIP, accountID, syncTime)
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, outcome)
return err
})
if skipped {
}
updated, err := am.Store.MarkPeerConnectedIfNewerSession(ctx, accountID, peer.ID, sessionStartedAt)
if err != nil {
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, telemetry.PeerStatusError)
return err
}
if !updated {
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, telemetry.PeerStatusStale)
log.WithContext(ctx).Tracef("peer %s already has a newer session in store, skipping connect", peer.ID)
return nil
}
if err != nil {
return err
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, telemetry.PeerStatusApplied)
if am.geo != nil && realIP != nil {
am.updatePeerLocationIfChanged(ctx, accountID, peer, realIP)
}
expired := peer.Status != nil && peer.Status.LoginExpired
if peer.AddedWithSSOLogin() {
settings, err = am.Store.GetAccountSettings(ctx, store.LockingStrengthNone, accountID)
settings, err := am.Store.GetAccountSettings(ctx, store.LockingStrengthNone, accountID)
if err != nil {
return err
}
if peer.LoginExpirationEnabled && settings.PeerLoginExpirationEnabled {
am.schedulePeerLoginExpiration(ctx, accountID)
}
if peer.InactivityExpirationEnabled && settings.PeerInactivityExpirationEnabled {
am.checkAndSchedulePeerInactivityExpiration(ctx, accountID)
}
}
if expired {
err = am.networkMapController.OnPeersUpdated(ctx, accountID, []string{peer.ID})
if err != nil {
if err = am.networkMapController.OnPeersUpdated(ctx, accountID, []string{peer.ID}); err != nil {
return fmt.Errorf("notify network map controller of peer update: %w", err)
}
}
@@ -120,41 +128,60 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
return nil
}
func updatePeerStatusAndLocation(ctx context.Context, geo geolocation.Geolocation, transaction store.Store, peer *nbpeer.Peer, connected bool, realIP net.IP, accountID string, syncTime time.Time) (bool, error) {
oldStatus := peer.Status.Copy()
newStatus := oldStatus
newStatus.LastSeen = syncTime
newStatus.Connected = connected
// whenever peer got connected that means that it logged in successfully
if newStatus.Connected {
newStatus.LoginExpired = false
}
peer.Status = newStatus
// MarkPeerDisconnected marks a peer as disconnected, but only when the
// stored session token matches the one passed in. A mismatch means a
// newer stream has already taken ownership of the peer — disconnects from
// the older stream are ignored. LastSeen is written by the database.
func (am *DefaultAccountManager) MarkPeerDisconnected(ctx context.Context, peerPubKey string, accountID string, sessionStartedAt int64) error {
start := time.Now()
defer func() {
am.metrics.AccountManagerMetrics().RecordPeerStatusUpdateDuration(telemetry.PeerStatusDisconnect, time.Since(start))
}()
if geo != nil && realIP != nil {
location, err := geo.Lookup(realIP)
if err != nil {
log.WithContext(ctx).Warnf("failed to get location for peer %s realip: [%s]: %v", peer.ID, realIP.String(), err)
} else {
peer.Location.ConnectionIP = realIP
peer.Location.CountryCode = location.Country.ISOCode
peer.Location.CityName = location.City.Names.En
peer.Location.GeoNameID = location.City.GeonameID
err = transaction.SavePeerLocation(ctx, accountID, peer)
if err != nil {
log.WithContext(ctx).Warnf("could not store location for peer %s: %s", peer.ID, err)
}
}
}
log.WithContext(ctx).Debugf("saving peer status for peer %s is connected: %t", peer.ID, connected)
err := transaction.SavePeerStatus(ctx, accountID, peer.ID, *newStatus)
peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerPubKey)
if err != nil {
return false, err
outcome := telemetry.PeerStatusError
if s, ok := status.FromError(err); ok && s.Type() == status.NotFound {
outcome = telemetry.PeerStatusPeerNotFound
}
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, outcome)
return err
}
return oldStatus.LoginExpired, nil
updated, err := am.Store.MarkPeerDisconnectedIfSameSession(ctx, accountID, peer.ID, sessionStartedAt)
if err != nil {
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, telemetry.PeerStatusError)
return err
}
if !updated {
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, telemetry.PeerStatusStale)
log.WithContext(ctx).Tracef("peer %s session token mismatch on disconnect (token=%d), skipping",
peer.ID, sessionStartedAt)
return nil
}
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, telemetry.PeerStatusApplied)
return nil
}
// updatePeerLocationIfChanged refreshes the geolocation on a separate
// row update, only when the connection IP actually changed. Geo lookups
// are expensive so we skip same-IP reconnects.
func (am *DefaultAccountManager) updatePeerLocationIfChanged(ctx context.Context, accountID string, peer *nbpeer.Peer, realIP net.IP) {
if peer.Location.ConnectionIP != nil && peer.Location.ConnectionIP.Equal(realIP) {
return
}
location, err := am.geo.Lookup(realIP)
if err != nil {
log.WithContext(ctx).Warnf("failed to get location for peer %s realip: [%s]: %v", peer.ID, realIP.String(), err)
return
}
peer.Location.ConnectionIP = realIP
peer.Location.CountryCode = location.Country.ISOCode
peer.Location.CityName = location.City.Names.En
peer.Location.GeoNameID = location.City.GeonameID
if err := am.Store.SavePeerLocation(ctx, accountID, peer); err != nil {
log.WithContext(ctx).Warnf("could not store location for peer %s: %s", peer.ID, err)
}
}
// UpdatePeer updates peer. Only Peer.Name, Peer.SSHEnabled, Peer.LoginExpirationEnabled and Peer.InactivityExpirationEnabled can be updated.
@@ -1101,6 +1128,79 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
return p, nmap, pc, err
}
// ExtendPeerSession refreshes the peer's SSO session deadline by updating
// LastLogin after a successful JWT validation. The tunnel is untouched: no
// network map sync, no peer reconnect.
//
// Preconditions enforced here:
// - userID must be present (caller validated the JWT and extracted the user ID).
// - The peer must exist and be SSO-registered (AddedWithSSOLogin) with
// LoginExpirationEnabled.
// - Account-level PeerLoginExpirationEnabled must be true.
// - The JWT user must match peer.UserID (mirrors LoginPeer at peer.go ~1028).
//
// Returns the new absolute UTC deadline.
func (am *DefaultAccountManager) ExtendPeerSession(ctx context.Context, peerPubKey, userID string) (time.Time, error) {
if userID == "" {
return time.Time{}, status.Errorf(status.PermissionDenied, "session extend requires a JWT")
}
accountID, err := am.Store.GetAccountIDByPeerPubKey(ctx, peerPubKey)
if err != nil {
return time.Time{}, err
}
settings, err := am.Store.GetAccountSettings(ctx, store.LockingStrengthNone, accountID)
if err != nil {
return time.Time{}, err
}
if !settings.PeerLoginExpirationEnabled {
return time.Time{}, status.Errorf(status.PreconditionFailed, "peer login expiration is disabled for the account")
}
var refreshed *nbpeer.Peer
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
peer, err := transaction.GetPeerByPeerPubKey(ctx, store.LockingStrengthUpdate, peerPubKey)
if err != nil {
return err
}
if !peer.AddedWithSSOLogin() || !peer.LoginExpirationEnabled {
return status.Errorf(status.PreconditionFailed, "peer is not eligible for session extension")
}
if peer.UserID != userID {
log.WithContext(ctx).Warnf("user mismatch when extending session for peer %s: peer user %s, jwt user %s", peer.ID, peer.UserID, userID)
return status.NewPeerLoginMismatchError()
}
peer = peer.UpdateLastLogin()
if err := transaction.SavePeer(ctx, accountID, peer); err != nil {
return err
}
if err := transaction.SaveUserLastLogin(ctx, accountID, userID, peer.GetLastLogin()); err != nil {
log.WithContext(ctx).Debugf("failed to update user last login during session extend: %v", err)
}
am.StoreEvent(ctx, userID, peer.ID, accountID, activity.UserExtendedPeerSession, peer.EventMeta(am.networkMapController.GetDNSDomain(settings)))
refreshed = peer
return nil
})
if err != nil {
return time.Time{}, err
}
// Reschedule the per-account expiration job. schedulePeerLoginExpiration
// is a no-op when a job is already running, but the running job will pick
// up the new LastLogin on its next tick. Calling it here is harmless and
// guarantees a job is in flight even if a prior one ended right before
// the extend.
am.schedulePeerLoginExpiration(ctx, accountID)
return refreshed.SessionExpiresAt(settings.PeerLoginExpirationEnabled, settings.PeerLoginExpiration), nil
}
// getPeerPostureChecks returns the posture checks for the peer.
func getPeerPostureChecks(ctx context.Context, transaction store.Store, accountID, peerID string) ([]*posture.Checks, error) {
policies, err := transaction.GetAccountPolicies(ctx, store.LockingStrengthNone, accountID)

View File

@@ -74,8 +74,19 @@ type ProxyMeta struct {
}
type PeerStatus struct { //nolint:revive
// LastSeen is the last time peer was connected to the management service
// LastSeen is the last time the peer status was updated (i.e. the last
// time we observed the peer being alive on a sync stream). Written by
// the database (CURRENT_TIMESTAMP) — callers do not supply it.
LastSeen time.Time
// SessionStartedAt records when the currently-active sync stream began,
// stored as Unix nanoseconds. It acts as the optimistic-locking token
// for status updates: a stream is only allowed to mutate the peer's
// status when its own token strictly exceeds the stored token (when connecting)
// or matches it exactly (for disconnects). Zero means "no
// active session". Integer nanoseconds are used so equality is
// precision-safe across drivers, and so the predicates compose to a
// single bigint comparison.
SessionStartedAt int64
// Connected indicates whether peer is connected to the management service or not
Connected bool
// LoginExpired
@@ -356,6 +367,22 @@ func (p *Peer) LoginExpired(expiresIn time.Duration) (bool, time.Duration) {
return timeLeft <= 0, timeLeft
}
// SessionExpiresAt returns the absolute UTC instant at which the peer's SSO
// session expires, derived from LastLogin and the account-level
// PeerLoginExpiration setting. Returns the zero value when login expiration
// does not apply (peer not SSO-registered, peer-level toggle off, or account
// expiry disabled). Callers should treat the zero value as "no deadline".
func (p *Peer) SessionExpiresAt(accountExpirationEnabled bool, expiresIn time.Duration) time.Time {
if !accountExpirationEnabled || !p.AddedWithSSOLogin() || !p.LoginExpirationEnabled {
return time.Time{}
}
last := p.GetLastLogin()
if last.IsZero() {
return time.Time{}
}
return last.Add(expiresIn).UTC()
}
// FQDN returns peers FQDN combined of the peer's DNS label and the system's DNS domain
func (p *Peer) FQDN(dnsDomain string) string {
if dnsDomain == "" {
@@ -375,10 +402,14 @@ func (p *Peer) EventMeta(dnsDomain string) map[string]any {
return meta
}
// Copy PeerStatus
// Copy PeerStatus. SessionStartedAt must be propagated so clone-based
// callers (Peer.Copy, MarkLoginExpired, UpdateLastLogin) don't silently
// reset the fencing token to zero — that would let any subsequent
// SavePeerStatus write reopen the optimistic-lock window.
func (p *PeerStatus) Copy() *PeerStatus {
return &PeerStatus{
LastSeen: p.LastSeen,
SessionStartedAt: p.SessionStartedAt,
Connected: p.Connected,
LoginExpired: p.LoginExpired,
RequiresApproval: p.RequiresApproval,

View File

@@ -498,8 +498,9 @@ func (s *SqlStore) SavePeerStatus(ctx context.Context, accountID, peerID string,
peerCopy.Status = &peerStatus
fieldsToUpdate := []string{
"peer_status_last_seen", "peer_status_connected",
"peer_status_login_expired", "peer_status_required_approval",
"peer_status_last_seen", "peer_status_session_started_at",
"peer_status_connected", "peer_status_login_expired",
"peer_status_requires_approval",
}
result := s.db.Model(&nbpeer.Peer{}).
Select(fieldsToUpdate).
@@ -516,6 +517,69 @@ func (s *SqlStore) SavePeerStatus(ctx context.Context, accountID, peerID string,
return nil
}
// MarkPeerConnectedIfNewerSession is an atomic optimistic-locked update.
// The peer is marked connected with the given session token only when
// the stored SessionStartedAt is strictly smaller than the incoming
// one — equivalently, when no newer stream has already taken ownership.
// The sentinel zero (set on peer creation or after a disconnect) counts
// as the smallest possible token. This is the write half of the
// fencing protocol described on PeerStatus.SessionStartedAt.
//
// The post-write side effects in the caller — geo lookup,
// schedulePeerLoginExpiration, checkAndSchedulePeerInactivityExpiration,
// OnPeersUpdated — all run AFTER this method returns and are deliberately
// outside the database write so they cannot extend the row-lock window.
//
// LastSeen is set to the database's clock (CURRENT_TIMESTAMP) at the
// moment the row is written. The caller never supplies LastSeen because
// the value would otherwise drift under lock contention — a Go-side
// time.Now() taken before the write can land minutes later than the
// actual UPDATE under load, which previously caused real ordering bugs.
func (s *SqlStore) MarkPeerConnectedIfNewerSession(ctx context.Context, accountID, peerID string, newSessionStartedAt int64) (bool, error) {
result := s.db.WithContext(ctx).
Model(&nbpeer.Peer{}).
Where(accountAndIDQueryCondition, accountID, peerID).
Where("peer_status_session_started_at < ?", newSessionStartedAt).
Updates(map[string]any{
"peer_status_connected": true,
"peer_status_last_seen": gorm.Expr("CURRENT_TIMESTAMP"),
"peer_status_session_started_at": newSessionStartedAt,
"peer_status_login_expired": false,
})
if result.Error != nil {
return false, status.Errorf(status.Internal, "mark peer connected: %v", result.Error)
}
return result.RowsAffected > 0, nil
}
// MarkPeerDisconnectedIfSameSession is an atomic optimistic-locked update.
// The peer is marked disconnected only when the stored SessionStartedAt
// matches the incoming token — meaning the stream that owns the current
// session is the one ending. If a newer stream has already replaced the
// session, the update is skipped. LastSeen is set to CURRENT_TIMESTAMP at
// write time; see MarkPeerConnectedIfNewerSession for the rationale.
//
// A zero sessionStartedAt is rejected at the call site; the underlying
// WHERE on equality would otherwise match every never-connected peer.
func (s *SqlStore) MarkPeerDisconnectedIfSameSession(ctx context.Context, accountID, peerID string, sessionStartedAt int64) (bool, error) {
if sessionStartedAt == 0 {
return false, nil
}
result := s.db.WithContext(ctx).
Model(&nbpeer.Peer{}).
Where(accountAndIDQueryCondition, accountID, peerID).
Where("peer_status_session_started_at = ?", sessionStartedAt).
Updates(map[string]any{
"peer_status_connected": false,
"peer_status_last_seen": gorm.Expr("CURRENT_TIMESTAMP"),
"peer_status_session_started_at": int64(0),
})
if result.Error != nil {
return false, status.Errorf(status.Internal, "mark peer disconnected: %v", result.Error)
}
return result.RowsAffected > 0, nil
}
func (s *SqlStore) SavePeerLocation(ctx context.Context, accountID string, peerWithLocation *nbpeer.Peer) error {
// To maintain data integrity, we create a copy of the peer's location to prevent unintended updates to other fields.
var peerCopy nbpeer.Peer
@@ -1723,9 +1787,10 @@ func (s *SqlStore) getPeers(ctx context.Context, accountID string) ([]nbpeer.Pee
inactivity_expiration_enabled, last_login, created_at, ephemeral, extra_dns_labels, allow_extra_dns_labels, meta_hostname,
meta_go_os, meta_kernel, meta_core, meta_platform, meta_os, meta_os_version, meta_wt_version, meta_ui_version,
meta_kernel_version, meta_network_addresses, meta_system_serial_number, meta_system_product_name, meta_system_manufacturer,
meta_environment, meta_flags, meta_files, meta_capabilities, peer_status_last_seen, peer_status_connected, peer_status_login_expired,
peer_status_requires_approval, location_connection_ip, location_country_code, location_city_name,
location_geo_name_id, proxy_meta_embedded, proxy_meta_cluster, ipv6 FROM peers WHERE account_id = $1`
meta_environment, meta_flags, meta_files, meta_capabilities, peer_status_last_seen, peer_status_session_started_at,
peer_status_connected, peer_status_login_expired, peer_status_requires_approval, location_connection_ip,
location_country_code, location_city_name, location_geo_name_id, proxy_meta_embedded, proxy_meta_cluster, ipv6
FROM peers WHERE account_id = $1`
rows, err := s.pool.Query(ctx, query, accountID)
if err != nil {
return nil, err
@@ -1738,6 +1803,7 @@ func (s *SqlStore) getPeers(ctx context.Context, accountID string) ([]nbpeer.Pee
lastLogin, createdAt sql.NullTime
sshEnabled, loginExpirationEnabled, inactivityExpirationEnabled, ephemeral, allowExtraDNSLabels sql.NullBool
peerStatusLastSeen sql.NullTime
peerStatusSessionStartedAt sql.NullInt64
peerStatusConnected, peerStatusLoginExpired, peerStatusRequiresApproval, proxyEmbedded sql.NullBool
ip, extraDNS, netAddr, env, flags, files, capabilities, connIP, ipv6 []byte
metaHostname, metaGoOS, metaKernel, metaCore, metaPlatform sql.NullString
@@ -1752,8 +1818,9 @@ func (s *SqlStore) getPeers(ctx context.Context, accountID string) ([]nbpeer.Pee
&allowExtraDNSLabels, &metaHostname, &metaGoOS, &metaKernel, &metaCore, &metaPlatform,
&metaOS, &metaOSVersion, &metaWtVersion, &metaUIVersion, &metaKernelVersion, &netAddr,
&metaSystemSerialNumber, &metaSystemProductName, &metaSystemManufacturer, &env, &flags, &files, &capabilities,
&peerStatusLastSeen, &peerStatusConnected, &peerStatusLoginExpired, &peerStatusRequiresApproval, &connIP,
&locationCountryCode, &locationCityName, &locationGeoNameID, &proxyEmbedded, &proxyCluster, &ipv6)
&peerStatusLastSeen, &peerStatusSessionStartedAt, &peerStatusConnected, &peerStatusLoginExpired,
&peerStatusRequiresApproval, &connIP, &locationCountryCode, &locationCityName, &locationGeoNameID,
&proxyEmbedded, &proxyCluster, &ipv6)
if err == nil {
if lastLogin.Valid {
@@ -1780,6 +1847,9 @@ func (s *SqlStore) getPeers(ctx context.Context, accountID string) ([]nbpeer.Pee
if peerStatusLastSeen.Valid {
p.Status.LastSeen = peerStatusLastSeen.Time
}
if peerStatusSessionStartedAt.Valid {
p.Status.SessionStartedAt = peerStatusSessionStartedAt.Int64
}
if peerStatusConnected.Valid {
p.Status.Connected = peerStatusConnected.Bool
}

View File

@@ -167,6 +167,21 @@ type Store interface {
GetAllEphemeralPeers(ctx context.Context, lockStrength LockingStrength) ([]*nbpeer.Peer, error)
SavePeer(ctx context.Context, accountID string, peer *nbpeer.Peer) error
SavePeerStatus(ctx context.Context, accountID, peerID string, status nbpeer.PeerStatus) error
// MarkPeerConnectedIfNewerSession sets the peer to connected with the
// given session token, but only when the stored SessionStartedAt is
// strictly less than newSessionStartedAt (the sentinel zero counts as
// "older"). LastSeen is recorded by the database at the moment the
// row is updated — never by the caller — so it always reflects the
// real write time even under lock contention.
// Returns true when the update happened, false when this stream lost
// the race against a newer session.
MarkPeerConnectedIfNewerSession(ctx context.Context, accountID, peerID string, newSessionStartedAt int64) (bool, error)
// MarkPeerDisconnectedIfSameSession sets the peer to disconnected and
// resets SessionStartedAt to zero, but only when the stored
// SessionStartedAt equals the given sessionStartedAt. LastSeen is
// recorded by the database. Returns true when the update happened,
// false when a newer session has taken over.
MarkPeerDisconnectedIfSameSession(ctx context.Context, accountID, peerID string, sessionStartedAt int64) (bool, error)
SavePeerLocation(ctx context.Context, accountID string, peer *nbpeer.Peer) error
ApproveAccountPeers(ctx context.Context, accountID string) (int, error)
DeletePeer(ctx context.Context, accountID string, peerID string) error

View File

@@ -2878,6 +2878,36 @@ func (mr *MockStoreMockRecorder) SavePeerStatus(ctx, accountID, peerID, status i
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SavePeerStatus", reflect.TypeOf((*MockStore)(nil).SavePeerStatus), ctx, accountID, peerID, status)
}
// MarkPeerConnectedIfNewerSession mocks base method.
func (m *MockStore) MarkPeerConnectedIfNewerSession(ctx context.Context, accountID, peerID string, newSessionStartedAt int64) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "MarkPeerConnectedIfNewerSession", ctx, accountID, peerID, newSessionStartedAt)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// MarkPeerConnectedIfNewerSession indicates an expected call of MarkPeerConnectedIfNewerSession.
func (mr *MockStoreMockRecorder) MarkPeerConnectedIfNewerSession(ctx, accountID, peerID, newSessionStartedAt interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkPeerConnectedIfNewerSession", reflect.TypeOf((*MockStore)(nil).MarkPeerConnectedIfNewerSession), ctx, accountID, peerID, newSessionStartedAt)
}
// MarkPeerDisconnectedIfSameSession mocks base method.
func (m *MockStore) MarkPeerDisconnectedIfSameSession(ctx context.Context, accountID, peerID string, sessionStartedAt int64) (bool, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "MarkPeerDisconnectedIfSameSession", ctx, accountID, peerID, sessionStartedAt)
ret0, _ := ret[0].(bool)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// MarkPeerDisconnectedIfSameSession indicates an expected call of MarkPeerDisconnectedIfSameSession.
func (mr *MockStoreMockRecorder) MarkPeerDisconnectedIfSameSession(ctx, accountID, peerID, sessionStartedAt interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkPeerDisconnectedIfSameSession", reflect.TypeOf((*MockStore)(nil).MarkPeerDisconnectedIfSameSession), ctx, accountID, peerID, sessionStartedAt)
}
// SavePolicy mocks base method.
func (m *MockStore) SavePolicy(ctx context.Context, policy *types2.Policy) error {
m.ctrl.T.Helper()

View File

@@ -16,6 +16,8 @@ type AccountManagerMetrics struct {
getPeerNetworkMapDurationMs metric.Float64Histogram
networkMapObjectCount metric.Int64Histogram
peerMetaUpdateCount metric.Int64Counter
peerStatusUpdateCounter metric.Int64Counter
peerStatusUpdateDurationMs metric.Float64Histogram
}
// NewAccountManagerMetrics creates an instance of AccountManagerMetrics
@@ -64,6 +66,24 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account
return nil, err
}
// peerStatusUpdateCounter records every attempt to mark a peer as connected or disconnected
peerStatusUpdateCounter, err := meter.Int64Counter("management.account.peer.status.update.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of peer status update attempts, labeled by operation (connect|disconnect) and outcome (applied|stale|error|peer_not_found)"))
if err != nil {
return nil, err
}
peerStatusUpdateDurationMs, err := meter.Float64Histogram("management.account.peer.status.update.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithExplicitBucketBoundaries(
1, 5, 15, 25, 50, 100, 250, 500, 1000, 2000, 5000,
),
metric.WithDescription("Duration of a peer status update (fence UPDATE + post-write side effects), labeled by operation"))
if err != nil {
return nil, err
}
return &AccountManagerMetrics{
ctx: ctx,
getPeerNetworkMapDurationMs: getPeerNetworkMapDurationMs,
@@ -71,10 +91,35 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account
updateAccountPeersCounter: updateAccountPeersCounter,
networkMapObjectCount: networkMapObjectCount,
peerMetaUpdateCount: peerMetaUpdateCount,
peerStatusUpdateCounter: peerStatusUpdateCounter,
peerStatusUpdateDurationMs: peerStatusUpdateDurationMs,
}, nil
}
// PeerStatusOperation labels the kind of fence-locked peer status write.
type PeerStatusOperation string
// PeerStatusOutcome labels how a fence-locked peer status write resolved.
type PeerStatusOutcome string
const (
PeerStatusConnect PeerStatusOperation = "connect"
PeerStatusDisconnect PeerStatusOperation = "disconnect"
// PeerStatusApplied — the fence WHERE matched and the UPDATE landed.
PeerStatusApplied PeerStatusOutcome = "applied"
// PeerStatusStale — the fence WHERE rejected the write because a
// newer session has already taken ownership (connect: stored token
// >= incoming; disconnect: stored token != incoming).
PeerStatusStale PeerStatusOutcome = "stale"
// PeerStatusError — the store returned a non-NotFound error.
PeerStatusError PeerStatusOutcome = "error"
// PeerStatusPeerNotFound — the peer lookup failed (the peer was
// deleted between the gRPC sync handshake and the status write).
PeerStatusPeerNotFound PeerStatusOutcome = "peer_not_found"
)
// CountUpdateAccountPeersDuration counts the duration of updating account peers
func (metrics *AccountManagerMetrics) CountUpdateAccountPeersDuration(duration time.Duration) {
metrics.updateAccountPeersDurationMs.Record(metrics.ctx, float64(duration.Nanoseconds())/1e6)
@@ -104,3 +149,23 @@ func (metrics *AccountManagerMetrics) CountUpdateAccountPeersTriggered(resource,
func (metrics *AccountManagerMetrics) CountPeerMetUpdate() {
metrics.peerMetaUpdateCount.Add(metrics.ctx, 1)
}
// CountPeerStatusUpdate increments the connect/disconnect counter,
// labeled by operation and outcome. Both labels are bounded enums.
func (metrics *AccountManagerMetrics) CountPeerStatusUpdate(op PeerStatusOperation, outcome PeerStatusOutcome) {
metrics.peerStatusUpdateCounter.Add(metrics.ctx, 1,
metric.WithAttributes(
attribute.String("operation", string(op)),
attribute.String("outcome", string(outcome)),
),
)
}
// RecordPeerStatusUpdateDuration records the wall-clock time spent
// running a peer status update (including post-write side effects),
// labeled by operation.
func (metrics *AccountManagerMetrics) RecordPeerStatusUpdateDuration(op PeerStatusOperation, d time.Duration) {
metrics.peerStatusUpdateDurationMs.Record(metrics.ctx, float64(d.Nanoseconds())/1e6,
metric.WithAttributes(attribute.String("operation", string(op))),
)
}

View File

@@ -29,6 +29,7 @@ type MockAppMetrics struct {
StoreMetricsFunc func() *StoreMetrics
UpdateChannelMetricsFunc func() *UpdateChannelMetrics
AddAccountManagerMetricsFunc func() *AccountManagerMetrics
EphemeralPeersMetricsFunc func() *EphemeralPeersMetrics
}
// GetMeter mocks the GetMeter function of the AppMetrics interface
@@ -103,6 +104,14 @@ func (mock *MockAppMetrics) AccountManagerMetrics() *AccountManagerMetrics {
return nil
}
// EphemeralPeersMetrics mocks the MockAppMetrics function of the EphemeralPeersMetrics interface
func (mock *MockAppMetrics) EphemeralPeersMetrics() *EphemeralPeersMetrics {
if mock.EphemeralPeersMetricsFunc != nil {
return mock.EphemeralPeersMetricsFunc()
}
return nil
}
// AppMetrics is metrics interface
type AppMetrics interface {
GetMeter() metric2.Meter
@@ -114,6 +123,7 @@ type AppMetrics interface {
StoreMetrics() *StoreMetrics
UpdateChannelMetrics() *UpdateChannelMetrics
AccountManagerMetrics() *AccountManagerMetrics
EphemeralPeersMetrics() *EphemeralPeersMetrics
}
// defaultAppMetrics are core application metrics based on OpenTelemetry https://opentelemetry.io/
@@ -129,6 +139,7 @@ type defaultAppMetrics struct {
storeMetrics *StoreMetrics
updateChannelMetrics *UpdateChannelMetrics
accountManagerMetrics *AccountManagerMetrics
ephemeralMetrics *EphemeralPeersMetrics
}
// IDPMetrics returns metrics for the idp package
@@ -161,6 +172,11 @@ func (appMetrics *defaultAppMetrics) AccountManagerMetrics() *AccountManagerMetr
return appMetrics.accountManagerMetrics
}
// EphemeralPeersMetrics returns metrics for the ephemeral peer cleanup loop
func (appMetrics *defaultAppMetrics) EphemeralPeersMetrics() *EphemeralPeersMetrics {
return appMetrics.ephemeralMetrics
}
// Close stop application metrics HTTP handler and closes listener.
func (appMetrics *defaultAppMetrics) Close() error {
if appMetrics.listener == nil {
@@ -245,6 +261,11 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) {
return nil, fmt.Errorf("failed to initialize account manager metrics: %w", err)
}
ephemeralMetrics, err := NewEphemeralPeersMetrics(ctx, meter)
if err != nil {
return nil, fmt.Errorf("failed to initialize ephemeral peers metrics: %w", err)
}
return &defaultAppMetrics{
Meter: meter,
ctx: ctx,
@@ -254,6 +275,7 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) {
storeMetrics: storeMetrics,
updateChannelMetrics: updateChannelMetrics,
accountManagerMetrics: accountManagerMetrics,
ephemeralMetrics: ephemeralMetrics,
}, nil
}
@@ -290,6 +312,11 @@ func NewAppMetricsWithMeter(ctx context.Context, meter metric2.Meter) (AppMetric
return nil, fmt.Errorf("failed to initialize account manager metrics: %w", err)
}
ephemeralMetrics, err := NewEphemeralPeersMetrics(ctx, meter)
if err != nil {
return nil, fmt.Errorf("failed to initialize ephemeral peers metrics: %w", err)
}
return &defaultAppMetrics{
Meter: meter,
ctx: ctx,
@@ -300,5 +327,6 @@ func NewAppMetricsWithMeter(ctx context.Context, meter metric2.Meter) (AppMetric
storeMetrics: storeMetrics,
updateChannelMetrics: updateChannelMetrics,
accountManagerMetrics: accountManagerMetrics,
ephemeralMetrics: ephemeralMetrics,
}, nil
}

View File

@@ -0,0 +1,115 @@
package telemetry
import (
"context"
"go.opentelemetry.io/otel/metric"
)
// EphemeralPeersMetrics tracks the ephemeral peer cleanup pipeline: how
// many peers are currently scheduled for deletion, how many tick runs
// the cleaner has performed, how many peers it has removed, and how
// many delete batches failed.
type EphemeralPeersMetrics struct {
ctx context.Context
pending metric.Int64UpDownCounter
cleanupRuns metric.Int64Counter
peersCleaned metric.Int64Counter
errors metric.Int64Counter
}
// NewEphemeralPeersMetrics constructs the ephemeral cleanup counters.
func NewEphemeralPeersMetrics(ctx context.Context, meter metric.Meter) (*EphemeralPeersMetrics, error) {
pending, err := meter.Int64UpDownCounter("management.ephemeral.peers.pending",
metric.WithUnit("1"),
metric.WithDescription("Number of ephemeral peers currently waiting to be cleaned up"))
if err != nil {
return nil, err
}
cleanupRuns, err := meter.Int64Counter("management.ephemeral.cleanup.runs.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of ephemeral cleanup ticks that processed at least one peer"))
if err != nil {
return nil, err
}
peersCleaned, err := meter.Int64Counter("management.ephemeral.peers.cleaned.counter",
metric.WithUnit("1"),
metric.WithDescription("Total number of ephemeral peers deleted by the cleanup loop"))
if err != nil {
return nil, err
}
errors, err := meter.Int64Counter("management.ephemeral.cleanup.errors.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of ephemeral cleanup batches (per account) that failed to delete"))
if err != nil {
return nil, err
}
return &EphemeralPeersMetrics{
ctx: ctx,
pending: pending,
cleanupRuns: cleanupRuns,
peersCleaned: peersCleaned,
errors: errors,
}, nil
}
// All methods are nil-receiver safe so callers that haven't wired metrics
// (tests, self-hosted with metrics off) can invoke them unconditionally.
// IncPending bumps the pending gauge when a peer is added to the cleanup list.
func (m *EphemeralPeersMetrics) IncPending() {
if m == nil {
return
}
m.pending.Add(m.ctx, 1)
}
// AddPending bumps the pending gauge by n — used at startup when the
// initial set of ephemeral peers is loaded from the store.
func (m *EphemeralPeersMetrics) AddPending(n int64) {
if m == nil || n <= 0 {
return
}
m.pending.Add(m.ctx, n)
}
// DecPending decreases the pending gauge — used both when a peer reconnects
// before its deadline (removed from the list) and when a cleanup tick
// actually deletes it.
func (m *EphemeralPeersMetrics) DecPending(n int64) {
if m == nil || n <= 0 {
return
}
m.pending.Add(m.ctx, -n)
}
// CountCleanupRun records one cleanup pass that processed >0 peers. Idle
// ticks (nothing to do) deliberately don't increment so the rate
// reflects useful work.
func (m *EphemeralPeersMetrics) CountCleanupRun() {
if m == nil {
return
}
m.cleanupRuns.Add(m.ctx, 1)
}
// CountPeersCleaned records the number of peers a single tick deleted.
func (m *EphemeralPeersMetrics) CountPeersCleaned(n int64) {
if m == nil || n <= 0 {
return
}
m.peersCleaned.Add(m.ctx, n)
}
// CountCleanupError records a failed delete batch.
func (m *EphemeralPeersMetrics) CountCleanupError() {
if m == nil {
return
}
m.errors.Add(m.ctx, 1)
}

View File

@@ -892,12 +892,7 @@ func (a *Account) GetPeerConnectionResources(ctx context.Context, peer *nbpeer.P
generateResources(rule, sourcePeers, FirewallRuleDirectionIN)
}
// Auth is collected when this peer serves the rule. For bidirectional
// rules the peer-in-sources side also serves inbound traffic, so it
// must be treated as a destination too.
peerServesAuth := peerInDestinations || (rule.Bidirectional && peerInSources)
if peerServesAuth && rule.Protocol == PolicyRuleProtocolNetbirdSSH {
if peerInDestinations && rule.Protocol == PolicyRuleProtocolNetbirdSSH {
sshEnabled = true
switch {
case len(rule.AuthorizedGroups) > 0:
@@ -929,7 +924,7 @@ func (a *Account) GetPeerConnectionResources(ctx context.Context, peer *nbpeer.P
default:
authorizedUsers[auth.Wildcard] = a.getAllowedUserIDs()
}
} else if peerServesAuth && policyRuleImpliesLegacySSH(rule) && peer.SSHEnabled {
} else if peerInDestinations && policyRuleImpliesLegacySSH(rule) && peer.SSHEnabled {
sshEnabled = true
authorizedUsers[auth.Wildcard] = a.getAllowedUserIDs()
}

View File

@@ -341,12 +341,7 @@ func (a *Account) getPeersGroupsPoliciesRoutes(
for _, srcGroupID := range rule.Sources {
relevantGroupIDs[srcGroupID] = a.GetGroup(srcGroupID)
}
}
// SSH auth requirements are gathered whenever this peer serves
// the rule. For bidirectional rules the peer-in-sources side
// also serves inbound traffic and must be treated as a destination.
if peerInDestinations || (rule.Bidirectional && peerInSources) {
if rule.Protocol == PolicyRuleProtocolNetbirdSSH {
switch {
case len(rule.AuthorizedGroups) > 0:

View File

@@ -221,12 +221,7 @@ func (c *NetworkMapComponents) getPeerConnectionResources(targetPeerID string) (
generateResources(rule, sourcePeers, FirewallRuleDirectionIN)
}
// Auth is collected when this peer serves the rule. For bidirectional
// rules the peer-in-sources side also serves inbound traffic, so it
// must be treated as a destination too.
peerServesAuth := peerInDestinations || (rule.Bidirectional && peerInSources)
if peerServesAuth && rule.Protocol == PolicyRuleProtocolNetbirdSSH {
if peerInDestinations && rule.Protocol == PolicyRuleProtocolNetbirdSSH {
sshEnabled = true
switch {
case len(rule.AuthorizedGroups) > 0:
@@ -257,7 +252,7 @@ func (c *NetworkMapComponents) getPeerConnectionResources(targetPeerID string) (
default:
authorizedUsers[auth.Wildcard] = c.getAllowedUserIDs()
}
} else if peerServesAuth && policyRuleImpliesLegacySSH(rule) && targetPeer.SSHEnabled {
} else if peerInDestinations && policyRuleImpliesLegacySSH(rule) && targetPeer.SSHEnabled {
sshEnabled = true
authorizedUsers[auth.Wildcard] = c.getAllowedUserIDs()
}
@@ -562,6 +557,7 @@ func (c *NetworkMapComponents) getRoutingPeerRoutes(peerID string) (enabledRoute
return enabledRoutes, disabledRoutes
}
func (c *NetworkMapComponents) filterRoutesByGroups(routes []*route.Route, groupListMap LookupMap) []*route.Route {
var filteredRoutes []*route.Route
for _, r := range routes {

View File

@@ -980,44 +980,6 @@ func TestComponents_SSHAuthorizedUsersContent(t *testing.T) {
assert.True(t, hasRoot || hasAdmin, "AuthorizedUsers should contain 'root' or 'admin' machine user mapping")
}
// TestComponents_SSHAuthorizedUsersBidirectionalSource verifies that a peer
// on the sources side of a bidirectional NetbirdSSH rule receives the rule's
// authorized users. The reverse direction (destinations -> sources) makes
// the source-side peer a destination too, so it must be able to authorize
// inbound SSH from the rule's destinations.
func TestComponents_SSHAuthorizedUsersBidirectionalSource(t *testing.T) {
account, validatedPeers := scalableTestAccountWithoutDefaultPolicy(20, 2)
account.Users["user-dev"] = &types.User{Id: "user-dev", Role: types.UserRoleUser, AccountID: "test-account", AutoGroups: []string{"ssh-users"}}
account.Groups["ssh-users"] = &types.Group{ID: "ssh-users", Name: "SSH Users", Peers: []string{}}
account.Policies = append(account.Policies, &types.Policy{
ID: "policy-ssh-bidir", Name: "Bidirectional SSH", Enabled: true, AccountID: "test-account",
Rules: []*types.PolicyRule{{
ID: "rule-ssh-bidir", Name: "SSH both ways", Enabled: true,
Action: types.PolicyTrafficActionAccept, Protocol: types.PolicyRuleProtocolNetbirdSSH,
Bidirectional: true,
Sources: []string{"group-0"}, Destinations: []string{"group-1"},
AuthorizedGroups: map[string][]string{"ssh-users": {"root"}},
}},
})
nmSrc := componentsNetworkMap(account, "peer-0", validatedPeers)
require.NotNil(t, nmSrc)
assert.True(t, nmSrc.EnableSSH, "source-side peer of bidirectional SSH rule should have SSH enabled")
require.NotEmpty(t, nmSrc.AuthorizedUsers, "source-side peer should receive authorized users from bidirectional rule")
rootUsers, hasRoot := nmSrc.AuthorizedUsers["root"]
require.True(t, hasRoot, "source-side peer should map the 'root' local user")
_, hasDev := rootUsers["user-dev"]
assert.True(t, hasDev, "source-side peer should include 'user-dev' under 'root'")
nmDst := componentsNetworkMap(account, "peer-10", validatedPeers)
require.NotNil(t, nmDst)
assert.True(t, nmDst.EnableSSH, "destination-side peer should also have SSH enabled")
_, hasRoot = nmDst.AuthorizedUsers["root"]
assert.True(t, hasRoot, "destination-side peer should also map the 'root' local user")
}
// TestComponents_SSHLegacyImpliedSSH verifies that a non-SSH ALL protocol policy with
// SSHEnabled peer implies legacy SSH access.
func TestComponents_SSHLegacyImpliedSSH(t *testing.T) {

View File

@@ -16,6 +16,10 @@ type Client interface {
Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
Register(setupKey string, jwtToken string, sysInfo *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
Login(sysInfo *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
// ExtendAuthSession refreshes the peer's SSO session deadline using a fresh JWT.
// Returns the new absolute deadline; zero time when the server reports the peer
// is not eligible for session extension.
ExtendAuthSession(sysInfo *system.Info, jwtToken string) (*proto.ExtendAuthSessionResponse, error)
GetDeviceAuthorizationFlow() (*proto.DeviceAuthorizationFlow, error)
GetPKCEAuthorizationFlow() (*proto.PKCEAuthorizationFlow, error)
GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, error)

View File

@@ -607,6 +607,61 @@ func (c *GrpcClient) Login(sysInfo *system.Info, pubSSHKey []byte, dnsLabels dom
return c.login(&proto.LoginRequest{Meta: infoToMetaData(sysInfo), PeerKeys: keys, DnsLabels: dnsLabels.ToPunycodeList()})
}
// ExtendAuthSession refreshes the peer's SSO session deadline on the management
// server using a freshly issued JWT. The tunnel is untouched: no network map
// sync, no peer reconnect. Returns the new absolute UTC deadline (zero time
// when the server reports the field empty).
func (c *GrpcClient) ExtendAuthSession(sysInfo *system.Info, jwtToken string) (*proto.ExtendAuthSessionResponse, error) {
if !c.ready() {
return nil, errors.New(errMsgNoMgmtConnection)
}
serverKey, err := c.getServerPublicKey()
if err != nil {
return nil, err
}
reqBody, err := encryption.EncryptMessage(*serverKey, c.key, &proto.ExtendAuthSessionRequest{
JwtToken: jwtToken,
Meta: infoToMetaData(sysInfo),
})
if err != nil {
log.Errorf("failed to encrypt extend auth session message: %s", err)
return nil, err
}
var resp *proto.EncryptedMessage
operation := func() error {
mgmCtx, cancel := context.WithTimeout(context.Background(), ConnectTimeout)
defer cancel()
var err error
resp, err = c.realClient.ExtendAuthSession(mgmCtx, &proto.EncryptedMessage{
WgPubKey: c.key.PublicKey().String(),
Body: reqBody,
})
if err != nil {
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.Canceled {
return err
}
return backoff.Permanent(err)
}
return nil
}
if err := backoff.Retry(operation, nbgrpc.Backoff(c.ctx)); err != nil {
log.Errorf("failed to extend auth session on Management Service: %v", err)
return nil, err
}
out := &proto.ExtendAuthSessionResponse{}
if err := encryption.DecryptMessage(*serverKey, c.key, resp.Body, out); err != nil {
log.Errorf("failed to decrypt extend auth session response: %s", err)
return nil, err
}
return out, nil
}
// GetDeviceAuthorizationFlow returns a device authorization flow information.
// It also takes care of encrypting and decrypting messages.
func (c *GrpcClient) GetDeviceAuthorizationFlow() (*proto.DeviceAuthorizationFlow, error) {

View File

@@ -14,6 +14,7 @@ type MockClient struct {
SyncFunc func(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error
RegisterFunc func(setupKey string, jwtToken string, info *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
LoginFunc func(info *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
ExtendAuthSessionFunc func(info *system.Info, jwtToken string) (*proto.ExtendAuthSessionResponse, error)
GetDeviceAuthorizationFlowFunc func() (*proto.DeviceAuthorizationFlow, error)
GetPKCEAuthorizationFlowFunc func() (*proto.PKCEAuthorizationFlow, error)
GetServerURLFunc func() string
@@ -65,6 +66,13 @@ func (m *MockClient) Login(info *system.Info, sshKey []byte, dnsLabels domain.Li
return m.LoginFunc(info, sshKey, dnsLabels)
}
func (m *MockClient) ExtendAuthSession(info *system.Info, jwtToken string) (*proto.ExtendAuthSessionResponse, error) {
if m.ExtendAuthSessionFunc == nil {
return nil, nil
}
return m.ExtendAuthSessionFunc(info, jwtToken)
}
func (m *MockClient) GetDeviceAuthorizationFlow() (*proto.DeviceAuthorizationFlow, error) {
if m.GetDeviceAuthorizationFlowFunc == nil {
return nil, nil

File diff suppressed because it is too large Load Diff

View File

@@ -52,6 +52,14 @@ service ManagementService {
// Executes a job on a target peer (e.g., debug bundle)
rpc Job(stream EncryptedMessage) returns (stream EncryptedMessage) {}
// ExtendAuthSession refreshes the peer's session expiry deadline using a fresh JWT.
// Same JWT validation pipeline as Login (including jwt.UserID == peer.UserID check),
// but does not redo the network-map sync. Only valid for SSO-registered peers where
// login expiration is enabled. The tunnel remains up.
// EncryptedMessage of the request has a body of ExtendAuthSessionRequest.
// EncryptedMessage of the response has a body of ExtendAuthSessionResponse.
rpc ExtendAuthSession(EncryptedMessage) returns (EncryptedMessage) {}
// CreateExpose creates a temporary reverse proxy service for a peer
rpc CreateExpose(EncryptedMessage) returns (EncryptedMessage) {}
@@ -133,6 +141,15 @@ message SyncResponse {
// Posture checks to be evaluated by client
repeated Checks Checks = 6;
// 3-state session deadline. Carried on every Sync snapshot so admin-side
// changes propagate live without a client reconnect.
// field unset (nil) → snapshot carries no info; client keeps the
// deadline it already had
// set, seconds=0 nanos=0 → explicit "expiry disabled" or peer is not
// SSO-registered; client clears its anchor
// set, valid timestamp → new absolute UTC deadline
google.protobuf.Timestamp sessionExpiresAt = 7;
}
message SyncMetaRequest {
@@ -244,6 +261,31 @@ message LoginResponse {
PeerConfig peerConfig = 2;
// Posture checks to be evaluated by client
repeated Checks Checks = 3;
// 3-state session deadline; same encoding as SyncResponse.sessionExpiresAt.
// field unset (nil) → no info; client keeps any deadline it had
// set, seconds=0 nanos=0 → explicit "expiry disabled" / non-SSO peer
// set, valid timestamp → new absolute UTC deadline
google.protobuf.Timestamp sessionExpiresAt = 4;
}
// ExtendAuthSessionRequest carries a fresh JWT to refresh the peer's session deadline.
// The encrypted body of an EncryptedMessage with this payload is sent to the
// ExtendAuthSession RPC.
message ExtendAuthSessionRequest {
// SSO token (must be a fresh, valid JWT for the peer's owning user)
string jwtToken = 1;
// Meta data of the peer (used for IdP user info refresh consistent with Login)
PeerSystemMeta meta = 2;
}
// ExtendAuthSessionResponse contains the refreshed session deadline.
message ExtendAuthSessionResponse {
// 3-state session deadline; same encoding as SyncResponse.sessionExpiresAt.
// In practice ExtendAuthSession only succeeds for SSO peers with expiry
// enabled, so this carries a valid timestamp on the success path. The
// 3-state encoding is documented here for symmetry with Login/Sync.
google.protobuf.Timestamp sessionExpiresAt = 1;
}
message ServerKeyResponse {

View File

@@ -52,6 +52,13 @@ type ManagementServiceClient interface {
Logout(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*Empty, error)
// Executes a job on a target peer (e.g., debug bundle)
Job(ctx context.Context, opts ...grpc.CallOption) (ManagementService_JobClient, error)
// ExtendAuthSession refreshes the peer's session expiry deadline using a fresh JWT.
// Same JWT validation pipeline as Login (including jwt.UserID == peer.UserID check),
// but does not redo the network-map sync. Only valid for SSO-registered peers where
// login expiration is enabled. The tunnel remains up.
// EncryptedMessage of the request has a body of ExtendAuthSessionRequest.
// EncryptedMessage of the response has a body of ExtendAuthSessionResponse.
ExtendAuthSession(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*EncryptedMessage, error)
// CreateExpose creates a temporary reverse proxy service for a peer
CreateExpose(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*EncryptedMessage, error)
// RenewExpose extends the TTL of an active expose session
@@ -194,6 +201,15 @@ func (x *managementServiceJobClient) Recv() (*EncryptedMessage, error) {
return m, nil
}
func (c *managementServiceClient) ExtendAuthSession(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*EncryptedMessage, error) {
out := new(EncryptedMessage)
err := c.cc.Invoke(ctx, "/management.ManagementService/ExtendAuthSession", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *managementServiceClient) CreateExpose(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*EncryptedMessage, error) {
out := new(EncryptedMessage)
err := c.cc.Invoke(ctx, "/management.ManagementService/CreateExpose", in, out, opts...)
@@ -259,6 +275,13 @@ type ManagementServiceServer interface {
Logout(context.Context, *EncryptedMessage) (*Empty, error)
// Executes a job on a target peer (e.g., debug bundle)
Job(ManagementService_JobServer) error
// ExtendAuthSession refreshes the peer's session expiry deadline using a fresh JWT.
// Same JWT validation pipeline as Login (including jwt.UserID == peer.UserID check),
// but does not redo the network-map sync. Only valid for SSO-registered peers where
// login expiration is enabled. The tunnel remains up.
// EncryptedMessage of the request has a body of ExtendAuthSessionRequest.
// EncryptedMessage of the response has a body of ExtendAuthSessionResponse.
ExtendAuthSession(context.Context, *EncryptedMessage) (*EncryptedMessage, error)
// CreateExpose creates a temporary reverse proxy service for a peer
CreateExpose(context.Context, *EncryptedMessage) (*EncryptedMessage, error)
// RenewExpose extends the TTL of an active expose session
@@ -299,6 +322,9 @@ func (UnimplementedManagementServiceServer) Logout(context.Context, *EncryptedMe
func (UnimplementedManagementServiceServer) Job(ManagementService_JobServer) error {
return status.Errorf(codes.Unimplemented, "method Job not implemented")
}
func (UnimplementedManagementServiceServer) ExtendAuthSession(context.Context, *EncryptedMessage) (*EncryptedMessage, error) {
return nil, status.Errorf(codes.Unimplemented, "method ExtendAuthSession not implemented")
}
func (UnimplementedManagementServiceServer) CreateExpose(context.Context, *EncryptedMessage) (*EncryptedMessage, error) {
return nil, status.Errorf(codes.Unimplemented, "method CreateExpose not implemented")
}
@@ -494,6 +520,24 @@ func (x *managementServiceJobServer) Recv() (*EncryptedMessage, error) {
return m, nil
}
func _ManagementService_ExtendAuthSession_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(EncryptedMessage)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ManagementServiceServer).ExtendAuthSession(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/management.ManagementService/ExtendAuthSession",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ManagementServiceServer).ExtendAuthSession(ctx, req.(*EncryptedMessage))
}
return interceptor(ctx, in, info, handler)
}
func _ManagementService_CreateExpose_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(EncryptedMessage)
if err := dec(in); err != nil {
@@ -583,6 +627,10 @@ var ManagementService_ServiceDesc = grpc.ServiceDesc{
MethodName: "Logout",
Handler: _ManagementService_Logout_Handler,
},
{
MethodName: "ExtendAuthSession",
Handler: _ManagementService_ExtendAuthSession_Handler,
},
{
MethodName: "CreateExpose",
Handler: _ManagementService_CreateExpose_Handler,