From 22e2519d7113dffec718198e54474cc0a6d71c87 Mon Sep 17 00:00:00 2001 From: Viktor Liu <17948409+lixmal@users.noreply.github.com> Date: Sat, 16 May 2026 22:51:48 +0900 Subject: [PATCH 01/10] [management] Avoid peer IP reallocation when account settings update preserves the network range (#6173) --- management/server/account.go | 37 +++++++++++-- management/server/account_test.go | 90 +++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+), 3 deletions(-) diff --git a/management/server/account.go b/management/server/account.go index 77a46a069..e7b4acaac 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -291,10 +291,15 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco return nil, status.NewPermissionDeniedError() } + // Canonicalize the incoming range so a caller-supplied prefix with host bits + // (e.g. 100.64.1.1/16) compares equal to the masked form stored on network.Net. + newSettings.NetworkRange = newSettings.NetworkRange.Masked() + var oldSettings *types.Settings var updateAccountPeers bool var groupChangesAffectPeers bool var reloadReverseProxy bool + var effectiveOldNetworkRange netip.Prefix err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error { var groupsUpdated bool @@ -308,6 +313,16 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco return err } + // No lock: the transaction already holds Settings(Update), and network.Net is + // only mutated by reallocateAccountPeerIPs, which is reachable only through + // this same code path. A Share lock here would extend an unnecessary row lock + // and complicate ordering against updatePeerIPv6InTransaction. + network, err := transaction.GetAccountNetwork(ctx, store.LockingStrengthNone, accountID) + if err != nil { + return fmt.Errorf("get account network: %w", err) + } + effectiveOldNetworkRange = prefixFromIPNet(network.Net) + if oldSettings.Extra != nil && newSettings.Extra != nil && oldSettings.Extra.PeerApprovalEnabled && !newSettings.Extra.PeerApprovalEnabled { approvedCount, err := transaction.ApproveAccountPeers(ctx, accountID) @@ -321,7 +336,7 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco } } - if oldSettings.NetworkRange != newSettings.NetworkRange { + if newSettings.NetworkRange.IsValid() && newSettings.NetworkRange != effectiveOldNetworkRange { if err = am.reallocateAccountPeerIPs(ctx, transaction, accountID, newSettings.NetworkRange); err != nil { return err } @@ -396,9 +411,9 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco } am.StoreEvent(ctx, userID, accountID, accountID, activity.AccountDNSDomainUpdated, eventMeta) } - if oldSettings.NetworkRange != newSettings.NetworkRange { + if newSettings.NetworkRange.IsValid() && newSettings.NetworkRange != effectiveOldNetworkRange { eventMeta := map[string]any{ - "old_network_range": oldSettings.NetworkRange.String(), + "old_network_range": effectiveOldNetworkRange.String(), "new_network_range": newSettings.NetworkRange.String(), } am.StoreEvent(ctx, userID, accountID, accountID, activity.AccountNetworkRangeUpdated, eventMeta) @@ -443,6 +458,22 @@ func ipv6SettingsChanged(old, updated *types.Settings) bool { return !slices.Equal(oldGroups, newGroups) } +// prefixFromIPNet returns the overlay prefix actually allocated on the account +// network, or an invalid prefix if none is set. Settings.NetworkRange is a +// user-facing override that is empty on legacy accounts, so the effective +// range must be read from network.Net to compare against an incoming update. +func prefixFromIPNet(ipNet net.IPNet) netip.Prefix { + if ipNet.IP == nil { + return netip.Prefix{} + } + addr, ok := netip.AddrFromSlice(ipNet.IP) + if !ok { + return netip.Prefix{} + } + ones, _ := ipNet.Mask.Size() + return netip.PrefixFrom(addr.Unmap(), ones) +} + func (am *DefaultAccountManager) validateSettingsUpdate(ctx context.Context, transaction store.Store, newSettings, oldSettings *types.Settings, userID, accountID string) error { halfYearLimit := 180 * 24 * time.Hour if newSettings.PeerLoginExpiration > halfYearLimit { diff --git a/management/server/account_test.go b/management/server/account_test.go index 65b27df49..60720faa6 100644 --- a/management/server/account_test.go +++ b/management/server/account_test.go @@ -3970,6 +3970,96 @@ func TestDefaultAccountManager_UpdateAccountSettings_NetworkRangeChange(t *testi } } +// TestDefaultAccountManager_UpdateAccountSettings_NetworkRangePreserved guards against +// peer IP reallocation when a settings update carries the network range that is already +// in use. Legacy accounts have Settings.NetworkRange unset in the DB while network.Net +// holds the actual allocated overlay; the dashboard backfills the GET response from +// network.Net and echoes the value back on PUT, so the diff must be against the +// effective range to avoid renumbering every peer on an unrelated settings change. +func TestDefaultAccountManager_UpdateAccountSettings_NetworkRangePreserved(t *testing.T) { + manager, _, account, peer1, peer2, peer3 := setupNetworkMapTest(t) + ctx := context.Background() + + settings, err := manager.Store.GetAccountSettings(ctx, store.LockingStrengthNone, account.Id) + require.NoError(t, err) + require.False(t, settings.NetworkRange.IsValid(), "precondition: new accounts leave Settings.NetworkRange unset") + + network, err := manager.Store.GetAccountNetwork(ctx, store.LockingStrengthNone, account.Id) + require.NoError(t, err) + require.NotNil(t, network.Net.IP, "precondition: network.Net should be allocated") + addr, ok := netip.AddrFromSlice(network.Net.IP) + require.True(t, ok) + ones, _ := network.Net.Mask.Size() + effective := netip.PrefixFrom(addr.Unmap(), ones) + require.True(t, effective.IsValid()) + + before := map[string]netip.Addr{peer1.ID: peer1.IP, peer2.ID: peer2.IP, peer3.ID: peer3.IP} + + // Round-trip the effective range as if the dashboard echoed back the GET-backfilled value. + _, err = manager.UpdateAccountSettings(ctx, account.Id, userID, &types.Settings{ + PeerLoginExpirationEnabled: true, + PeerLoginExpiration: types.DefaultPeerLoginExpiration, + NetworkRange: effective, + Extra: &types.ExtraSettings{}, + }) + require.NoError(t, err) + + peers, err := manager.Store.GetAccountPeers(ctx, store.LockingStrengthNone, account.Id, "", "") + require.NoError(t, err) + require.Len(t, peers, len(before)) + for _, p := range peers { + assert.Equal(t, before[p.ID], p.IP, "peer %s IP should not change when range matches effective", p.ID) + } + + // Carrying the same range with host bits set must also be a no-op once canonicalized. + hostBitsForm := netip.PrefixFrom(peer1.IP, ones) + require.NotEqual(t, effective, hostBitsForm, "precondition: host-bit form should differ before masking") + _, err = manager.UpdateAccountSettings(ctx, account.Id, userID, &types.Settings{ + PeerLoginExpirationEnabled: true, + PeerLoginExpiration: types.DefaultPeerLoginExpiration, + NetworkRange: hostBitsForm, + Extra: &types.ExtraSettings{}, + }) + require.NoError(t, err) + + peers, err = manager.Store.GetAccountPeers(ctx, store.LockingStrengthNone, account.Id, "", "") + require.NoError(t, err) + for _, p := range peers { + assert.Equal(t, before[p.ID], p.IP, "peer %s IP should not change for host-bit-set equivalent range", p.ID) + } + + // Omitting NetworkRange (invalid prefix) must also be a no-op. + _, err = manager.UpdateAccountSettings(ctx, account.Id, userID, &types.Settings{ + PeerLoginExpirationEnabled: true, + PeerLoginExpiration: types.DefaultPeerLoginExpiration, + Extra: &types.ExtraSettings{}, + }) + require.NoError(t, err) + + peers, err = manager.Store.GetAccountPeers(ctx, store.LockingStrengthNone, account.Id, "", "") + require.NoError(t, err) + for _, p := range peers { + assert.Equal(t, before[p.ID], p.IP, "peer %s IP should not change when NetworkRange omitted", p.ID) + } + + // Sanity: an actually different range still triggers reallocation. + newRange := netip.MustParsePrefix("100.99.0.0/16") + _, err = manager.UpdateAccountSettings(ctx, account.Id, userID, &types.Settings{ + PeerLoginExpirationEnabled: true, + PeerLoginExpiration: types.DefaultPeerLoginExpiration, + NetworkRange: newRange, + Extra: &types.ExtraSettings{}, + }) + require.NoError(t, err) + + peers, err = manager.Store.GetAccountPeers(ctx, store.LockingStrengthNone, account.Id, "", "") + require.NoError(t, err) + for _, p := range peers { + assert.True(t, newRange.Contains(p.IP), "peer %s should be in new range %s, got %s", p.ID, newRange, p.IP) + assert.NotEqual(t, before[p.ID], p.IP, "peer %s IP should change on real range update", p.ID) + } +} + func TestDefaultAccountManager_UpdateAccountSettings_IPv6EnabledGroups(t *testing.T) { manager, _, account, peer1, peer2, peer3 := setupNetworkMapTest(t) ctx := context.Background() From 347c5bf317794729a044ce9f866f29e357d386d9 Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Sat, 16 May 2026 16:29:01 +0200 Subject: [PATCH 02/10] Avoid context cancellation in `cancelPeerRoutines` (#6175) When closing go routines and handling peer disconnect, we should avoid canceling the flow due to parent gRPC context cancellation. This change triggers disconnection handling with a context that is not bound to the parent gRPC cancellation. --- management/internals/shared/grpc/server.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/management/internals/shared/grpc/server.go b/management/internals/shared/grpc/server.go index 70024bac6..1d8234304 100644 --- a/management/internals/shared/grpc/server.go +++ b/management/internals/shared/grpc/server.go @@ -522,10 +522,11 @@ func (s *Server) sendJob(ctx context.Context, peerKey wgtypes.Key, job *job.Even } func (s *Server) cancelPeerRoutines(ctx context.Context, accountID string, peer *nbpeer.Peer, streamStartTime time.Time) { - unlock := s.acquirePeerLockByUID(ctx, peer.Key) + uncanceledCTX := context.WithoutCancel(ctx) + unlock := s.acquirePeerLockByUID(uncanceledCTX, peer.Key) defer unlock() - s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, streamStartTime) + s.cancelPeerRoutinesWithoutLock(uncanceledCTX, accountID, peer, streamStartTime) } func (s *Server) cancelPeerRoutinesWithoutLock(ctx context.Context, accountID string, peer *nbpeer.Peer, streamStartTime time.Time) { From 3f91f49277e1841bdfccda06ae7baa0430e6de2e Mon Sep 17 00:00:00 2001 From: Viktor Liu <17948409+lixmal@users.noreply.github.com> Date: Sat, 16 May 2026 23:52:57 +0900 Subject: [PATCH 03/10] Clean up legacy 32-bit and HKCU registry entries on Windows install (#6176) --- client/installer.nsis | 23 ++++++++++++++++++----- client/netbird.wxs | 25 +++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/client/installer.nsis b/client/installer.nsis index 63bff1c5b..3e057df10 100644 --- a/client/installer.nsis +++ b/client/installer.nsis @@ -260,15 +260,23 @@ WriteRegStr ${REG_ROOT} "${UNINSTALL_PATH}" "Publisher" "${COMP_NAME}" WriteRegStr ${REG_ROOT} "${UI_REG_APP_PATH}" "" "$INSTDIR\${UI_APP_EXE}" -; Create autostart registry entry based on checkbox +; Drop Run, App Paths and Uninstall entries left in the 32-bit registry view +; or HKCU by legacy installers. +DetailPrint "Cleaning legacy 32-bit / HKCU entries..." +DeleteRegValue HKCU "${AUTOSTART_REG_KEY}" "${APP_NAME}" +SetRegView 32 +DeleteRegValue HKLM "${AUTOSTART_REG_KEY}" "${APP_NAME}" +DeleteRegKey HKLM "${REG_APP_PATH}" +DeleteRegKey HKLM "${UI_REG_APP_PATH}" +DeleteRegKey HKLM "${UNINSTALL_PATH}" +SetRegView 64 + DetailPrint "Autostart enabled: $AutostartEnabled" ${If} $AutostartEnabled == "1" WriteRegStr HKLM "${AUTOSTART_REG_KEY}" "${APP_NAME}" '"$INSTDIR\${UI_APP_EXE}.exe"' DetailPrint "Added autostart registry entry: $INSTDIR\${UI_APP_EXE}.exe" ${Else} DeleteRegValue HKLM "${AUTOSTART_REG_KEY}" "${APP_NAME}" - ; Legacy: pre-HKLM installs wrote to HKCU; clean that up too. - DeleteRegValue HKCU "${AUTOSTART_REG_KEY}" "${APP_NAME}" DetailPrint "Autostart not enabled by user" ${EndIf} @@ -299,11 +307,16 @@ ExecWait '"$INSTDIR\${MAIN_APP_EXE}" service uninstall' DetailPrint "Terminating Netbird UI process..." ExecWait `taskkill /im ${UI_APP_EXE}.exe /f` -; Remove autostart registry entry +; Remove autostart entries from every view a previous installer may have used. DetailPrint "Removing autostart registry entry if exists..." DeleteRegValue HKLM "${AUTOSTART_REG_KEY}" "${APP_NAME}" -; Legacy: pre-HKLM installs wrote to HKCU; clean that up too. DeleteRegValue HKCU "${AUTOSTART_REG_KEY}" "${APP_NAME}" +SetRegView 32 +DeleteRegValue HKLM "${AUTOSTART_REG_KEY}" "${APP_NAME}" +DeleteRegKey HKLM "${REG_APP_PATH}" +DeleteRegKey HKLM "${UI_REG_APP_PATH}" +DeleteRegKey HKLM "${UNINSTALL_PATH}" +SetRegView 64 ; Handle data deletion based on checkbox DetailPrint "Checking if user requested data deletion..." diff --git a/client/netbird.wxs b/client/netbird.wxs index 6f18b63b5..96814ce52 100644 --- a/client/netbird.wxs +++ b/client/netbird.wxs @@ -64,6 +64,13 @@ + + + + + @@ -76,10 +83,28 @@ + + + + + + + + + + + From 705f87fc20d4410fd8e21986725b2063f72d864d Mon Sep 17 00:00:00 2001 From: Nicolas Frati Date: Mon, 18 May 2026 12:57:59 +0200 Subject: [PATCH 04/10] [management] fix: device redirect uri wasn't registered (#6191) * fix: device redirect uri wasn't registered * fix lint --- management/server/idp/embedded.go | 27 ++++++++++++++++++++----- management/server/idp/embedded_test.go | 28 ++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 5 deletions(-) diff --git a/management/server/idp/embedded.go b/management/server/idp/embedded.go index a1852a8bc..821e6ff55 100644 --- a/management/server/idp/embedded.go +++ b/management/server/idp/embedded.go @@ -6,7 +6,9 @@ import ( "errors" "fmt" "net/http" + "net/url" "os" + "path" "strings" "github.com/dexidp/dex/storage" @@ -138,10 +140,13 @@ func (c *EmbeddedIdPConfig) ToYAMLConfig() (*dex.YAMLConfig, error) { return nil, fmt.Errorf("invalid IdP storage config: %w", err) } - // Build CLI redirect URIs including the device callback (both relative and absolute) + // Build CLI redirect URIs including the device callback. Dex uses the issuer-relative + // path (for example, /oauth2/device/callback) when completing the device flow, so + // include it explicitly in addition to the legacy bare path and absolute URL. cliRedirectURIs := c.CLIRedirectURIs cliRedirectURIs = append(cliRedirectURIs, "/device/callback") - cliRedirectURIs = append(cliRedirectURIs, c.Issuer+"/device/callback") + cliRedirectURIs = append(cliRedirectURIs, issuerRelativeDeviceCallback(c.Issuer)) + cliRedirectURIs = append(cliRedirectURIs, strings.TrimSuffix(c.Issuer, "/")+"/device/callback") // Build dashboard redirect URIs including the OAuth callback for proxy authentication dashboardRedirectURIs := c.DashboardRedirectURIs @@ -154,6 +159,10 @@ func (c *EmbeddedIdPConfig) ToYAMLConfig() (*dex.YAMLConfig, error) { // MGMT api and the dashboard, adding baseURL means less configuration for the instance admin dashboardPostLogoutRedirectURIs = append(dashboardPostLogoutRedirectURIs, baseURL) + redirectURIs := make([]string, 0) + redirectURIs = append(redirectURIs, cliRedirectURIs...) + redirectURIs = append(redirectURIs, dashboardRedirectURIs...) + cfg := &dex.YAMLConfig{ Issuer: c.Issuer, Storage: dex.Storage{ @@ -179,14 +188,14 @@ func (c *EmbeddedIdPConfig) ToYAMLConfig() (*dex.YAMLConfig, error) { ID: staticClientDashboard, Name: "NetBird Dashboard", Public: true, - RedirectURIs: dashboardRedirectURIs, + RedirectURIs: redirectURIs, PostLogoutRedirectURIs: sanitizePostLogoutRedirectURIs(dashboardPostLogoutRedirectURIs), }, { ID: staticClientCLI, Name: "NetBird CLI", Public: true, - RedirectURIs: cliRedirectURIs, + RedirectURIs: redirectURIs, }, }, StaticConnectors: c.StaticConnectors, @@ -217,6 +226,14 @@ func (c *EmbeddedIdPConfig) ToYAMLConfig() (*dex.YAMLConfig, error) { return cfg, nil } +func issuerRelativeDeviceCallback(issuer string) string { + u, err := url.Parse(issuer) + if err != nil || u.Path == "" { + return "/device/callback" + } + return path.Join(u.Path, "/device/callback") +} + // Due to how the frontend generates the logout, sometimes it appends a trailing slash // and because Dex only allows exact matches, we need to make sure we always have both // versions of each provided uri @@ -299,7 +316,7 @@ func resolveSessionCookieEncryptionKey(configuredKey string) (string, error) { } } - return "", fmt.Errorf("invalid embedded IdP session cookie encryption key: %s (or sessionCookieEncryptionKey) must be 16, 24, or 32 bytes as a raw string or base64-encoded to one of those lengths; got %d raw bytes", sessionCookieEncryptionKeyEnv, len([]byte(key))) + return "", fmt.Errorf("invalid embedded IdP session cookie encryption key:%s (or sessionCookieEncryptionKey) must be 16, 24, or 32 bytes as a raw string or base64-encoded to one of those lengths; got %d raw bytes", sessionCookieEncryptionKeyEnv, len([]byte(key))) } func validSessionCookieEncryptionKeyLength(length int) bool { diff --git a/management/server/idp/embedded_test.go b/management/server/idp/embedded_test.go index 09dc67614..91cd27aee 100644 --- a/management/server/idp/embedded_test.go +++ b/management/server/idp/embedded_test.go @@ -314,6 +314,34 @@ func TestEmbeddedIdPManager_UpdateUserPassword(t *testing.T) { }) } +func TestEmbeddedIdPConfig_ToYAMLConfig_IncludesDeviceCallbackRedirectURI(t *testing.T) { + config := &EmbeddedIdPConfig{ + Enabled: true, + Issuer: "https://example.com/oauth2", + Storage: EmbeddedStorageConfig{ + Type: "sqlite3", + Config: EmbeddedStorageTypeConfig{ + File: filepath.Join(t.TempDir(), "dex.db"), + }, + }, + } + + yamlConfig, err := config.ToYAMLConfig() + require.NoError(t, err) + + var cliRedirectURIs []string + for _, client := range yamlConfig.StaticClients { + if client.ID == staticClientCLI { + cliRedirectURIs = client.RedirectURIs + break + } + } + require.NotEmpty(t, cliRedirectURIs) + assert.Contains(t, cliRedirectURIs, "/device/callback") + assert.Contains(t, cliRedirectURIs, "/oauth2/device/callback") + assert.Contains(t, cliRedirectURIs, "https://example.com/oauth2/device/callback") +} + func TestEmbeddedIdPConfig_ToYAMLConfig_SessionCookieEncryptionKey(t *testing.T) { t.Setenv(sessionCookieEncryptionKeyEnv, "") From 13d32d274f74b700557f8f6a615f56be2ab9c6a5 Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Mon, 18 May 2026 20:25:12 +0200 Subject: [PATCH 05/10] [management] Fence peer status updates with a session token (#6193) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [management] Fence peer status updates with a session token The connect/disconnect path used a best-effort LastSeen-after-streamStart comparison to decide whether a status update should land. Under contention — a re-sync arriving while the previous stream's disconnect was still in flight, or two management replicas seeing the same peer at once — the check was a read-then-decide-then-write window: any UPDATE in between caused the wrong row to be written. The Go-side time.Now() that fed the comparison also drifted under lock contention, since it was captured seconds before the write actually committed. Replace it with an integer-nanosecond fencing token stored alongside the status. Every gRPC sync stream uses its open time (UnixNano) as its token. Connects only land when the incoming token is strictly greater than the stored one; disconnects only land when the incoming token equals the stored one (i.e. we're the stream that owns the current session). Both are single optimistic-locked UPDATEs — no read-then-write, no transaction wrapper. LastSeen is now written by the database itself (CURRENT_TIMESTAMP). The caller never supplies it, so the value always reflects the real moment of the UPDATE rather than the moment the caller queued the work — which was already off by minutes under heavy lock contention. Side effects (geo lookup, peer-login-expiration scheduling, network-map fan-out) are explicitly documented as running after the fence UPDATE commits, never inside it. Geo also skips the update when realIP equals the stored ConnectionIP, dropping a redundant SavePeerLocation call on same-IP reconnects. Tests cover the three semantic cases (matched disconnect lands, stale disconnect dropped, stale connect dropped) plus a 16-goroutine race test that asserts the highest token always wins. * [management] Add SessionStartedAt to peer status updates Stored `SessionStartedAt` for fencing token propagation across goroutines and updated database queries/functions to handle the new field. Removed outdated geolocation handling logic and adjusted tests for concurrency safety. * Rename `peer_status_required_approval` to `peer_status_requires_approval` in SQL store fields --- management/server/account.go | 29 ++-- management/server/account/manager.go | 3 +- management/server/account/manager_mock.go | 22 ++- management/server/account_test.go | 115 ++++++++++++--- management/server/mock_server/account_mock.go | 24 +++- management/server/peer.go | 131 +++++++++--------- management/server/peer/peer.go | 19 ++- management/server/store/sql_store.go | 84 ++++++++++- management/server/store/store.go | 15 ++ management/server/store/store_mock.go | 30 ++++ 10 files changed, 354 insertions(+), 118 deletions(-) diff --git a/management/server/account.go b/management/server/account.go index e7b4acaac..8e4e595f0 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -1868,35 +1868,32 @@ func domainIsUpToDate(domain string, domainCategory string, userAuth auth.UserAu return domainCategory == types.PrivateCategory || userAuth.DomainCategory != types.PrivateCategory || domain != userAuth.Domain } +// SyncAndMarkPeer is the per-Sync entry point: it refreshes the peer's +// network map and then marks the peer connected with a session token +// derived from syncTime (the moment the gRPC stream opened). Any +// concurrent stream that started earlier loses the optimistic-lock race +// in MarkPeerConnected and bails without writing. func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, syncTime time.Time) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) { peer, netMap, postureChecks, dnsfwdPort, err := am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, accountID) if err != nil { return nil, nil, nil, 0, fmt.Errorf("error syncing peer: %w", err) } - err = am.MarkPeerConnected(ctx, peerPubKey, true, realIP, accountID, syncTime) - if err != nil { + if err := am.MarkPeerConnected(ctx, peerPubKey, realIP, accountID, syncTime.UnixNano()); err != nil { log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err) } return peer, netMap, postureChecks, dnsfwdPort, nil } +// OnPeerDisconnected is invoked when a sync stream ends. It marks the +// peer disconnected only when the stored SessionStartedAt matches the +// nanosecond token derived from streamStartTime — i.e. only when this +// is the stream that currently owns the peer's session. A mismatch +// means a newer stream has already replaced us, so the disconnect is +// dropped. func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string, streamStartTime time.Time) error { - peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerPubKey) - if err != nil { - log.WithContext(ctx).Warnf("failed to get peer %s for disconnect check: %v", peerPubKey, err) - return nil - } - - if peer.Status.LastSeen.After(streamStartTime) { - log.WithContext(ctx).Tracef("peer %s has newer activity (lastSeen=%s > streamStart=%s), skipping disconnect", - peerPubKey, peer.Status.LastSeen.Format(time.RFC3339), streamStartTime.Format(time.RFC3339)) - return nil - } - - err = am.MarkPeerConnected(ctx, peerPubKey, false, nil, accountID, time.Now().UTC()) - if err != nil { + if err := am.MarkPeerDisconnected(ctx, peerPubKey, accountID, streamStartTime.UnixNano()); err != nil { log.WithContext(ctx).Warnf("failed marking peer as disconnected %s %v", peerPubKey, err) } return nil diff --git a/management/server/account/manager.go b/management/server/account/manager.go index 71af0645c..ae3de8d79 100644 --- a/management/server/account/manager.go +++ b/management/server/account/manager.go @@ -61,7 +61,8 @@ type Manager interface { GetUserFromUserAuth(ctx context.Context, userAuth auth.UserAuth) (*types.User, error) ListUsers(ctx context.Context, accountID string) ([]*types.User, error) GetPeers(ctx context.Context, accountID, userID, nameFilter, ipFilter string) ([]*nbpeer.Peer, error) - MarkPeerConnected(ctx context.Context, peerKey string, connected bool, realIP net.IP, accountID string, syncTime time.Time) error + MarkPeerConnected(ctx context.Context, peerKey string, realIP net.IP, accountID string, sessionStartedAt int64) error + MarkPeerDisconnected(ctx context.Context, peerKey string, accountID string, sessionStartedAt int64) error DeletePeer(ctx context.Context, accountID, peerID, userID string) error UpdatePeer(ctx context.Context, accountID, userID string, p *nbpeer.Peer) (*nbpeer.Peer, error) UpdatePeerIP(ctx context.Context, accountID, userID, peerID string, newIP netip.Addr) error diff --git a/management/server/account/manager_mock.go b/management/server/account/manager_mock.go index 7ffc41d73..0486e63ec 100644 --- a/management/server/account/manager_mock.go +++ b/management/server/account/manager_mock.go @@ -1305,17 +1305,31 @@ func (mr *MockManagerMockRecorder) LoginPeer(ctx, login interface{}) *gomock.Cal } // MarkPeerConnected mocks base method. -func (m *MockManager) MarkPeerConnected(ctx context.Context, peerKey string, connected bool, realIP net.IP, accountID string, syncTime time.Time) error { +func (m *MockManager) MarkPeerConnected(ctx context.Context, peerKey string, realIP net.IP, accountID string, sessionStartedAt int64) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "MarkPeerConnected", ctx, peerKey, connected, realIP, accountID, syncTime) + ret := m.ctrl.Call(m, "MarkPeerConnected", ctx, peerKey, realIP, accountID, sessionStartedAt) ret0, _ := ret[0].(error) return ret0 } // MarkPeerConnected indicates an expected call of MarkPeerConnected. -func (mr *MockManagerMockRecorder) MarkPeerConnected(ctx, peerKey, connected, realIP, accountID, syncTime interface{}) *gomock.Call { +func (mr *MockManagerMockRecorder) MarkPeerConnected(ctx, peerKey, realIP, accountID, sessionStartedAt interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkPeerConnected", reflect.TypeOf((*MockManager)(nil).MarkPeerConnected), ctx, peerKey, connected, realIP, accountID, syncTime) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkPeerConnected", reflect.TypeOf((*MockManager)(nil).MarkPeerConnected), ctx, peerKey, realIP, accountID, sessionStartedAt) +} + +// MarkPeerDisconnected mocks base method. +func (m *MockManager) MarkPeerDisconnected(ctx context.Context, peerKey string, accountID string, sessionStartedAt int64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MarkPeerDisconnected", ctx, peerKey, accountID, sessionStartedAt) + ret0, _ := ret[0].(error) + return ret0 +} + +// MarkPeerDisconnected indicates an expected call of MarkPeerDisconnected. +func (mr *MockManagerMockRecorder) MarkPeerDisconnected(ctx, peerKey, accountID, sessionStartedAt interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkPeerDisconnected", reflect.TypeOf((*MockManager)(nil).MarkPeerDisconnected), ctx, peerKey, accountID, sessionStartedAt) } // OnPeerDisconnected mocks base method. diff --git a/management/server/account_test.go b/management/server/account_test.go index 60720faa6..ba621030c 100644 --- a/management/server/account_test.go +++ b/management/server/account_test.go @@ -1813,7 +1813,7 @@ func TestDefaultAccountManager_UpdatePeer_PeerLoginExpiration(t *testing.T) { accountID, err := manager.GetAccountIDByUserID(context.Background(), auth.UserAuth{UserId: userID}) require.NoError(t, err, "unable to get the account") - err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), true, nil, accountID, time.Now().UTC()) + err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), nil, accountID, time.Now().UTC().UnixNano()) require.NoError(t, err, "unable to mark peer connected") _, err = manager.UpdateAccountSettings(context.Background(), accountID, userID, &types.Settings{ @@ -1884,7 +1884,7 @@ func TestDefaultAccountManager_MarkPeerConnected_PeerLoginExpiration(t *testing. require.NoError(t, err, "unable to get the account") // when we mark peer as connected, the peer login expiration routine should trigger - err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), true, nil, accountID, time.Now().UTC()) + err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), nil, accountID, time.Now().UTC().UnixNano()) require.NoError(t, err, "unable to mark peer connected") failed := waitTimeout(wg, time.Second) @@ -1910,15 +1910,16 @@ func TestDefaultAccountManager_OnPeerDisconnected_LastSeenCheck(t *testing.T) { }, false) require.NoError(t, err, "unable to add peer") - t.Run("disconnect peer when streamStartTime is after LastSeen", func(t *testing.T) { - err = manager.MarkPeerConnected(context.Background(), peerPubKey, true, nil, accountID, time.Now().UTC()) + t.Run("disconnect peer when session token matches", func(t *testing.T) { + streamStartTime := time.Now().UTC() + err = manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, streamStartTime.UnixNano()) require.NoError(t, err, "unable to mark peer connected") peer, err := manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey) require.NoError(t, err, "unable to get peer") require.True(t, peer.Status.Connected, "peer should be connected") - - streamStartTime := time.Now().UTC() + require.Equal(t, streamStartTime.UnixNano(), peer.Status.SessionStartedAt, + "SessionStartedAt should equal the token we passed in") err = manager.OnPeerDisconnected(context.Background(), accountID, peerPubKey, streamStartTime) require.NoError(t, err) @@ -1926,49 +1927,127 @@ func TestDefaultAccountManager_OnPeerDisconnected_LastSeenCheck(t *testing.T) { peer, err = manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey) require.NoError(t, err) require.False(t, peer.Status.Connected, "peer should be disconnected") + require.Equal(t, int64(0), peer.Status.SessionStartedAt, "SessionStartedAt should be reset to 0") }) - t.Run("skip disconnect when LastSeen is after streamStartTime (zombie stream protection)", func(t *testing.T) { - err = manager.MarkPeerConnected(context.Background(), peerPubKey, true, nil, accountID, time.Now().UTC()) + t.Run("skip disconnect when stored session is newer (zombie stream protection)", func(t *testing.T) { + // Newer stream wins on connect (sets SessionStartedAt = now ns). + streamStartTime := time.Now().UTC() + err = manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, streamStartTime.UnixNano()) require.NoError(t, err, "unable to mark peer connected") peer, err := manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey) require.NoError(t, err) require.True(t, peer.Status.Connected, "peer should be connected") - streamStartTime := peer.Status.LastSeen.Add(-1 * time.Hour) + // Older stream tries to mark disconnect with its own (older) session token — + // fencing kicks in and the write is dropped. + staleStreamStartTime := streamStartTime.Add(-1 * time.Hour) - err = manager.OnPeerDisconnected(context.Background(), accountID, peerPubKey, streamStartTime) + err = manager.OnPeerDisconnected(context.Background(), accountID, peerPubKey, staleStreamStartTime) require.NoError(t, err) peer, err = manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey) require.NoError(t, err) require.True(t, peer.Status.Connected, - "peer should remain connected because LastSeen > streamStartTime (zombie stream protection)") + "peer should remain connected because the stored session is newer than the disconnect token") + require.Equal(t, streamStartTime.UnixNano(), peer.Status.SessionStartedAt, + "SessionStartedAt should still hold the winning stream's token") }) - t.Run("skip stale connect when peer already has newer LastSeen (blocked goroutine protection)", func(t *testing.T) { + t.Run("skip stale connect when stored session is newer (blocked goroutine protection)", func(t *testing.T) { node2SyncTime := time.Now().UTC() - err = manager.MarkPeerConnected(context.Background(), peerPubKey, true, nil, accountID, node2SyncTime) + err = manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, node2SyncTime.UnixNano()) require.NoError(t, err, "node 2 should connect peer") peer, err := manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey) require.NoError(t, err) require.True(t, peer.Status.Connected, "peer should be connected") - require.Equal(t, node2SyncTime.Unix(), peer.Status.LastSeen.Unix(), "LastSeen should be node2SyncTime") + require.Equal(t, node2SyncTime.UnixNano(), peer.Status.SessionStartedAt, + "SessionStartedAt should equal node2SyncTime token") node1StaleSyncTime := node2SyncTime.Add(-1 * time.Minute) - err = manager.MarkPeerConnected(context.Background(), peerPubKey, true, nil, accountID, node1StaleSyncTime) + err = manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, node1StaleSyncTime.UnixNano()) require.NoError(t, err, "stale connect should not return error") peer, err = manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey) require.NoError(t, err) require.True(t, peer.Status.Connected, "peer should still be connected") - require.Equal(t, node2SyncTime.Unix(), peer.Status.LastSeen.Unix(), - "LastSeen should NOT be overwritten by stale syncTime from blocked goroutine") + require.Equal(t, node2SyncTime.UnixNano(), peer.Status.SessionStartedAt, + "SessionStartedAt should NOT be overwritten by stale token from blocked goroutine") }) } +// TestDefaultAccountManager_MarkPeerConnected_ConcurrentRace exercises the +// fencing protocol under contention: many goroutines race to mark the +// same peer connected with distinct session tokens at the same time. +// The contract is that the highest token always wins and is what remains +// in the store, regardless of execution order. +func TestDefaultAccountManager_MarkPeerConnected_ConcurrentRace(t *testing.T) { + manager, _, err := createManager(t) + require.NoError(t, err, "unable to create account manager") + + accountID, err := manager.GetAccountIDByUserID(context.Background(), auth.UserAuth{UserId: userID}) + require.NoError(t, err, "unable to get account") + + key, err := wgtypes.GenerateKey() + require.NoError(t, err, "unable to generate WireGuard key") + peerPubKey := key.PublicKey().String() + + _, _, _, err = manager.AddPeer(context.Background(), "", "", userID, &nbpeer.Peer{ + Key: peerPubKey, + Meta: nbpeer.PeerSystemMeta{Hostname: "race-peer"}, + }, false) + require.NoError(t, err, "unable to add peer") + + const workers = 16 + base := time.Now().UTC().UnixNano() + tokens := make([]int64, workers) + for i := range tokens { + // Spread tokens by 1ms so the comparison is unambiguous; the + // largest is index workers-1. + tokens[i] = base + int64(i)*int64(time.Millisecond) + } + expected := tokens[workers-1] + + var ready sync.WaitGroup + ready.Add(workers) + var start sync.WaitGroup + start.Add(1) + var done sync.WaitGroup + done.Add(workers) + + // require.* calls t.FailNow which is documented as unsafe from + // non-test goroutines (it calls runtime.Goexit on the wrong stack and + // races with the WaitGroup). Collect errors here and assert from the + // main goroutine after done.Wait(). + errs := make(chan error, workers) + + for i := 0; i < workers; i++ { + token := tokens[i] + go func() { + defer done.Done() + ready.Done() + start.Wait() + errs <- manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, token) + }() + } + + ready.Wait() + start.Done() + done.Wait() + close(errs) + for err := range errs { + require.NoError(t, err, "MarkPeerConnected must not error under contention") + } + + peer, err := manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey) + require.NoError(t, err) + require.True(t, peer.Status.Connected, "peer should be connected after the race") + require.Equal(t, expected, peer.Status.SessionStartedAt, + "the largest token must win regardless of execution order") +} + func TestDefaultAccountManager_UpdateAccountSettings_PeerLoginExpiration(t *testing.T) { manager, _, err := createManager(t) require.NoError(t, err, "unable to create account manager") @@ -1991,7 +2070,7 @@ func TestDefaultAccountManager_UpdateAccountSettings_PeerLoginExpiration(t *test account, err := manager.Store.GetAccount(context.Background(), accountID) require.NoError(t, err, "unable to get the account") - err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), true, nil, accountID, time.Now().UTC()) + err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), nil, accountID, time.Now().UTC().UnixNano()) require.NoError(t, err, "unable to mark peer connected") wg := &sync.WaitGroup{} diff --git a/management/server/mock_server/account_mock.go b/management/server/mock_server/account_mock.go index 08091d4b7..aba408184 100644 --- a/management/server/mock_server/account_mock.go +++ b/management/server/mock_server/account_mock.go @@ -38,7 +38,8 @@ type MockAccountManager struct { GetUserFromUserAuthFunc func(ctx context.Context, userAuth auth.UserAuth) (*types.User, error) ListUsersFunc func(ctx context.Context, accountID string) ([]*types.User, error) GetPeersFunc func(ctx context.Context, accountID, userID, nameFilter, ipFilter string) ([]*nbpeer.Peer, error) - MarkPeerConnectedFunc func(ctx context.Context, peerKey string, connected bool, realIP net.IP, syncTime time.Time) error + MarkPeerConnectedFunc func(ctx context.Context, peerKey string, realIP net.IP, accountID string, sessionStartedAt int64) error + MarkPeerDisconnectedFunc func(ctx context.Context, peerKey string, accountID string, sessionStartedAt int64) error SyncAndMarkPeerFunc func(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, syncTime time.Time) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) DeletePeerFunc func(ctx context.Context, accountID, peerKey, userID string) error GetNetworkMapFunc func(ctx context.Context, peerKey string) (*types.NetworkMap, error) @@ -227,7 +228,14 @@ func (am *MockAccountManager) SyncAndMarkPeer(ctx context.Context, accountID str return nil, nil, nil, 0, status.Errorf(codes.Unimplemented, "method MarkPeerConnected is not implemented") } -func (am *MockAccountManager) OnPeerDisconnected(_ context.Context, accountID string, peerPubKey string, streamStartTime time.Time) error { +func (am *MockAccountManager) OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string, streamStartTime time.Time) error { + // Mirror DefaultAccountManager.OnPeerDisconnected: drive the fencing + // hook so tests that inject MarkPeerDisconnectedFunc actually observe + // disconnect events. Falls through to nil when no hook is set, which + // is the original behaviour. + if am.MarkPeerDisconnectedFunc != nil { + return am.MarkPeerDisconnectedFunc(ctx, peerPubKey, accountID, streamStartTime.UnixNano()) + } return nil } @@ -328,13 +336,21 @@ func (am *MockAccountManager) GetAccountIDByUserID(ctx context.Context, userAuth } // MarkPeerConnected mock implementation of MarkPeerConnected from server.AccountManager interface -func (am *MockAccountManager) MarkPeerConnected(ctx context.Context, peerKey string, connected bool, realIP net.IP, accountID string, syncTime time.Time) error { +func (am *MockAccountManager) MarkPeerConnected(ctx context.Context, peerKey string, realIP net.IP, accountID string, sessionStartedAt int64) error { if am.MarkPeerConnectedFunc != nil { - return am.MarkPeerConnectedFunc(ctx, peerKey, connected, realIP, syncTime) + return am.MarkPeerConnectedFunc(ctx, peerKey, realIP, accountID, sessionStartedAt) } return status.Errorf(codes.Unimplemented, "method MarkPeerConnected is not implemented") } +// MarkPeerDisconnected mock implementation of MarkPeerDisconnected from server.AccountManager interface +func (am *MockAccountManager) MarkPeerDisconnected(ctx context.Context, peerKey string, accountID string, sessionStartedAt int64) error { + if am.MarkPeerDisconnectedFunc != nil { + return am.MarkPeerDisconnectedFunc(ctx, peerKey, accountID, sessionStartedAt) + } + return status.Errorf(codes.Unimplemented, "method MarkPeerDisconnected is not implemented") +} + // DeleteAccount mock implementation of DeleteAccount from server.AccountManager interface func (am *MockAccountManager) DeleteAccount(ctx context.Context, accountID, userID string) error { if am.DeleteAccountFunc != nil { diff --git a/management/server/peer.go b/management/server/peer.go index c3b130ba2..4790a5aab 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -16,7 +16,6 @@ import ( "golang.org/x/exp/maps" nbdns "github.com/netbirdio/netbird/dns" - "github.com/netbirdio/netbird/management/server/geolocation" "github.com/netbirdio/netbird/management/server/idp" routerTypes "github.com/netbirdio/netbird/management/server/networks/routers/types" "github.com/netbirdio/netbird/management/server/permissions/modules" @@ -63,56 +62,51 @@ func (am *DefaultAccountManager) GetPeers(ctx context.Context, accountID, userID return am.Store.GetUserPeers(ctx, store.LockingStrengthNone, accountID, userID) } -// MarkPeerConnected marks peer as connected (true) or disconnected (false) -// syncTime is used as the LastSeen timestamp and for stale request detection -func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, connected bool, realIP net.IP, accountID string, syncTime time.Time) error { - var peer *nbpeer.Peer - var settings *types.Settings - var expired bool - var err error - var skipped bool - - err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error { - peer, err = transaction.GetPeerByPeerPubKey(ctx, store.LockingStrengthUpdate, peerPubKey) - if err != nil { - return err - } - - if connected && !syncTime.After(peer.Status.LastSeen) { - log.WithContext(ctx).Tracef("peer %s has newer activity (lastSeen=%s >= syncTime=%s), skipping connect", - peer.ID, peer.Status.LastSeen.Format(time.RFC3339), syncTime.Format(time.RFC3339)) - skipped = true - return nil - } - - expired, err = updatePeerStatusAndLocation(ctx, am.geo, transaction, peer, connected, realIP, accountID, syncTime) - return err - }) - if skipped { - return nil - } +// MarkPeerConnected marks a peer as connected with optimistic-locked +// fencing on PeerStatus.SessionStartedAt. The sessionStartedAt argument +// is the start time of the gRPC sync stream that owns this update, +// expressed as Unix nanoseconds — only the call whose token is greater +// than what's stored wins. LastSeen is written by the database itself; +// we never pass it down. +// +// Disconnects use MarkPeerDisconnected and require the session to match +// exactly; see PeerStatus.SessionStartedAt for the protocol. +func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, realIP net.IP, accountID string, sessionStartedAt int64) error { + peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerPubKey) if err != nil { return err } + updated, err := am.Store.MarkPeerConnectedIfNewerSession(ctx, accountID, peer.ID, sessionStartedAt) + if err != nil { + return err + } + if !updated { + log.WithContext(ctx).Tracef("peer %s already has a newer session in store, skipping connect", peer.ID) + return nil + } + + if am.geo != nil && realIP != nil { + am.updatePeerLocationIfChanged(ctx, accountID, peer, realIP) + } + + expired := peer.Status != nil && peer.Status.LoginExpired + if peer.AddedWithSSOLogin() { - settings, err = am.Store.GetAccountSettings(ctx, store.LockingStrengthNone, accountID) + settings, err := am.Store.GetAccountSettings(ctx, store.LockingStrengthNone, accountID) if err != nil { return err } - if peer.LoginExpirationEnabled && settings.PeerLoginExpirationEnabled { am.schedulePeerLoginExpiration(ctx, accountID) } - if peer.InactivityExpirationEnabled && settings.PeerInactivityExpirationEnabled { am.checkAndSchedulePeerInactivityExpiration(ctx, accountID) } } if expired { - err = am.networkMapController.OnPeersUpdated(ctx, accountID, []string{peer.ID}) - if err != nil { + if err = am.networkMapController.OnPeersUpdated(ctx, accountID, []string{peer.ID}); err != nil { return fmt.Errorf("notify network map controller of peer update: %w", err) } } @@ -120,41 +114,46 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK return nil } -func updatePeerStatusAndLocation(ctx context.Context, geo geolocation.Geolocation, transaction store.Store, peer *nbpeer.Peer, connected bool, realIP net.IP, accountID string, syncTime time.Time) (bool, error) { - oldStatus := peer.Status.Copy() - newStatus := oldStatus - newStatus.LastSeen = syncTime - newStatus.Connected = connected - // whenever peer got connected that means that it logged in successfully - if newStatus.Connected { - newStatus.LoginExpired = false - } - peer.Status = newStatus - - if geo != nil && realIP != nil { - location, err := geo.Lookup(realIP) - if err != nil { - log.WithContext(ctx).Warnf("failed to get location for peer %s realip: [%s]: %v", peer.ID, realIP.String(), err) - } else { - peer.Location.ConnectionIP = realIP - peer.Location.CountryCode = location.Country.ISOCode - peer.Location.CityName = location.City.Names.En - peer.Location.GeoNameID = location.City.GeonameID - err = transaction.SavePeerLocation(ctx, accountID, peer) - if err != nil { - log.WithContext(ctx).Warnf("could not store location for peer %s: %s", peer.ID, err) - } - } - } - - log.WithContext(ctx).Debugf("saving peer status for peer %s is connected: %t", peer.ID, connected) - - err := transaction.SavePeerStatus(ctx, accountID, peer.ID, *newStatus) +// MarkPeerDisconnected marks a peer as disconnected, but only when the +// stored session token matches the one passed in. A mismatch means a +// newer stream has already taken ownership of the peer — disconnects from +// the older stream are ignored. LastSeen is written by the database. +func (am *DefaultAccountManager) MarkPeerDisconnected(ctx context.Context, peerPubKey string, accountID string, sessionStartedAt int64) error { + peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerPubKey) if err != nil { - return false, err + return err } - return oldStatus.LoginExpired, nil + updated, err := am.Store.MarkPeerDisconnectedIfSameSession(ctx, accountID, peer.ID, sessionStartedAt) + if err != nil { + return err + } + if !updated { + log.WithContext(ctx).Tracef("peer %s session token mismatch on disconnect (token=%d), skipping", + peer.ID, sessionStartedAt) + } + return nil +} + +// updatePeerLocationIfChanged refreshes the geolocation on a separate +// row update, only when the connection IP actually changed. Geo lookups +// are expensive so we skip same-IP reconnects. +func (am *DefaultAccountManager) updatePeerLocationIfChanged(ctx context.Context, accountID string, peer *nbpeer.Peer, realIP net.IP) { + if peer.Location.ConnectionIP != nil && peer.Location.ConnectionIP.Equal(realIP) { + return + } + location, err := am.geo.Lookup(realIP) + if err != nil { + log.WithContext(ctx).Warnf("failed to get location for peer %s realip: [%s]: %v", peer.ID, realIP.String(), err) + return + } + peer.Location.ConnectionIP = realIP + peer.Location.CountryCode = location.Country.ISOCode + peer.Location.CityName = location.City.Names.En + peer.Location.GeoNameID = location.City.GeonameID + if err := am.Store.SavePeerLocation(ctx, accountID, peer); err != nil { + log.WithContext(ctx).Warnf("could not store location for peer %s: %s", peer.ID, err) + } } // UpdatePeer updates peer. Only Peer.Name, Peer.SSHEnabled, Peer.LoginExpirationEnabled and Peer.InactivityExpirationEnabled can be updated. diff --git a/management/server/peer/peer.go b/management/server/peer/peer.go index 17df761a1..2963dfcbd 100644 --- a/management/server/peer/peer.go +++ b/management/server/peer/peer.go @@ -74,8 +74,19 @@ type ProxyMeta struct { } type PeerStatus struct { //nolint:revive - // LastSeen is the last time peer was connected to the management service + // LastSeen is the last time the peer status was updated (i.e. the last + // time we observed the peer being alive on a sync stream). Written by + // the database (CURRENT_TIMESTAMP) — callers do not supply it. LastSeen time.Time + // SessionStartedAt records when the currently-active sync stream began, + // stored as Unix nanoseconds. It acts as the optimistic-locking token + // for status updates: a stream is only allowed to mutate the peer's + // status when its own token strictly exceeds the stored token (when connecting) + // or matches it exactly (for disconnects). Zero means "no + // active session". Integer nanoseconds are used so equality is + // precision-safe across drivers, and so the predicates compose to a + // single bigint comparison. + SessionStartedAt int64 // Connected indicates whether peer is connected to the management service or not Connected bool // LoginExpired @@ -375,10 +386,14 @@ func (p *Peer) EventMeta(dnsDomain string) map[string]any { return meta } -// Copy PeerStatus +// Copy PeerStatus. SessionStartedAt must be propagated so clone-based +// callers (Peer.Copy, MarkLoginExpired, UpdateLastLogin) don't silently +// reset the fencing token to zero — that would let any subsequent +// SavePeerStatus write reopen the optimistic-lock window. func (p *PeerStatus) Copy() *PeerStatus { return &PeerStatus{ LastSeen: p.LastSeen, + SessionStartedAt: p.SessionStartedAt, Connected: p.Connected, LoginExpired: p.LoginExpired, RequiresApproval: p.RequiresApproval, diff --git a/management/server/store/sql_store.go b/management/server/store/sql_store.go index 893ee2168..8cf37de56 100644 --- a/management/server/store/sql_store.go +++ b/management/server/store/sql_store.go @@ -498,8 +498,9 @@ func (s *SqlStore) SavePeerStatus(ctx context.Context, accountID, peerID string, peerCopy.Status = &peerStatus fieldsToUpdate := []string{ - "peer_status_last_seen", "peer_status_connected", - "peer_status_login_expired", "peer_status_required_approval", + "peer_status_last_seen", "peer_status_session_started_at", + "peer_status_connected", "peer_status_login_expired", + "peer_status_requires_approval", } result := s.db.Model(&nbpeer.Peer{}). Select(fieldsToUpdate). @@ -516,6 +517,69 @@ func (s *SqlStore) SavePeerStatus(ctx context.Context, accountID, peerID string, return nil } +// MarkPeerConnectedIfNewerSession is an atomic optimistic-locked update. +// The peer is marked connected with the given session token only when +// the stored SessionStartedAt is strictly smaller than the incoming +// one — equivalently, when no newer stream has already taken ownership. +// The sentinel zero (set on peer creation or after a disconnect) counts +// as the smallest possible token. This is the write half of the +// fencing protocol described on PeerStatus.SessionStartedAt. +// +// The post-write side effects in the caller — geo lookup, +// schedulePeerLoginExpiration, checkAndSchedulePeerInactivityExpiration, +// OnPeersUpdated — all run AFTER this method returns and are deliberately +// outside the database write so they cannot extend the row-lock window. +// +// LastSeen is set to the database's clock (CURRENT_TIMESTAMP) at the +// moment the row is written. The caller never supplies LastSeen because +// the value would otherwise drift under lock contention — a Go-side +// time.Now() taken before the write can land minutes later than the +// actual UPDATE under load, which previously caused real ordering bugs. +func (s *SqlStore) MarkPeerConnectedIfNewerSession(ctx context.Context, accountID, peerID string, newSessionStartedAt int64) (bool, error) { + result := s.db.WithContext(ctx). + Model(&nbpeer.Peer{}). + Where(accountAndIDQueryCondition, accountID, peerID). + Where("peer_status_session_started_at < ?", newSessionStartedAt). + Updates(map[string]any{ + "peer_status_connected": true, + "peer_status_last_seen": gorm.Expr("CURRENT_TIMESTAMP"), + "peer_status_session_started_at": newSessionStartedAt, + "peer_status_login_expired": false, + }) + if result.Error != nil { + return false, status.Errorf(status.Internal, "mark peer connected: %v", result.Error) + } + return result.RowsAffected > 0, nil +} + +// MarkPeerDisconnectedIfSameSession is an atomic optimistic-locked update. +// The peer is marked disconnected only when the stored SessionStartedAt +// matches the incoming token — meaning the stream that owns the current +// session is the one ending. If a newer stream has already replaced the +// session, the update is skipped. LastSeen is set to CURRENT_TIMESTAMP at +// write time; see MarkPeerConnectedIfNewerSession for the rationale. +// +// A zero sessionStartedAt is rejected at the call site; the underlying +// WHERE on equality would otherwise match every never-connected peer. +func (s *SqlStore) MarkPeerDisconnectedIfSameSession(ctx context.Context, accountID, peerID string, sessionStartedAt int64) (bool, error) { + if sessionStartedAt == 0 { + return false, nil + } + result := s.db.WithContext(ctx). + Model(&nbpeer.Peer{}). + Where(accountAndIDQueryCondition, accountID, peerID). + Where("peer_status_session_started_at = ?", sessionStartedAt). + Updates(map[string]any{ + "peer_status_connected": false, + "peer_status_last_seen": gorm.Expr("CURRENT_TIMESTAMP"), + "peer_status_session_started_at": int64(0), + }) + if result.Error != nil { + return false, status.Errorf(status.Internal, "mark peer disconnected: %v", result.Error) + } + return result.RowsAffected > 0, nil +} + func (s *SqlStore) SavePeerLocation(ctx context.Context, accountID string, peerWithLocation *nbpeer.Peer) error { // To maintain data integrity, we create a copy of the peer's location to prevent unintended updates to other fields. var peerCopy nbpeer.Peer @@ -1723,9 +1787,10 @@ func (s *SqlStore) getPeers(ctx context.Context, accountID string) ([]nbpeer.Pee inactivity_expiration_enabled, last_login, created_at, ephemeral, extra_dns_labels, allow_extra_dns_labels, meta_hostname, meta_go_os, meta_kernel, meta_core, meta_platform, meta_os, meta_os_version, meta_wt_version, meta_ui_version, meta_kernel_version, meta_network_addresses, meta_system_serial_number, meta_system_product_name, meta_system_manufacturer, - meta_environment, meta_flags, meta_files, meta_capabilities, peer_status_last_seen, peer_status_connected, peer_status_login_expired, - peer_status_requires_approval, location_connection_ip, location_country_code, location_city_name, - location_geo_name_id, proxy_meta_embedded, proxy_meta_cluster, ipv6 FROM peers WHERE account_id = $1` + meta_environment, meta_flags, meta_files, meta_capabilities, peer_status_last_seen, peer_status_session_started_at, + peer_status_connected, peer_status_login_expired, peer_status_requires_approval, location_connection_ip, + location_country_code, location_city_name, location_geo_name_id, proxy_meta_embedded, proxy_meta_cluster, ipv6 + FROM peers WHERE account_id = $1` rows, err := s.pool.Query(ctx, query, accountID) if err != nil { return nil, err @@ -1738,6 +1803,7 @@ func (s *SqlStore) getPeers(ctx context.Context, accountID string) ([]nbpeer.Pee lastLogin, createdAt sql.NullTime sshEnabled, loginExpirationEnabled, inactivityExpirationEnabled, ephemeral, allowExtraDNSLabels sql.NullBool peerStatusLastSeen sql.NullTime + peerStatusSessionStartedAt sql.NullInt64 peerStatusConnected, peerStatusLoginExpired, peerStatusRequiresApproval, proxyEmbedded sql.NullBool ip, extraDNS, netAddr, env, flags, files, capabilities, connIP, ipv6 []byte metaHostname, metaGoOS, metaKernel, metaCore, metaPlatform sql.NullString @@ -1752,8 +1818,9 @@ func (s *SqlStore) getPeers(ctx context.Context, accountID string) ([]nbpeer.Pee &allowExtraDNSLabels, &metaHostname, &metaGoOS, &metaKernel, &metaCore, &metaPlatform, &metaOS, &metaOSVersion, &metaWtVersion, &metaUIVersion, &metaKernelVersion, &netAddr, &metaSystemSerialNumber, &metaSystemProductName, &metaSystemManufacturer, &env, &flags, &files, &capabilities, - &peerStatusLastSeen, &peerStatusConnected, &peerStatusLoginExpired, &peerStatusRequiresApproval, &connIP, - &locationCountryCode, &locationCityName, &locationGeoNameID, &proxyEmbedded, &proxyCluster, &ipv6) + &peerStatusLastSeen, &peerStatusSessionStartedAt, &peerStatusConnected, &peerStatusLoginExpired, + &peerStatusRequiresApproval, &connIP, &locationCountryCode, &locationCityName, &locationGeoNameID, + &proxyEmbedded, &proxyCluster, &ipv6) if err == nil { if lastLogin.Valid { @@ -1780,6 +1847,9 @@ func (s *SqlStore) getPeers(ctx context.Context, accountID string) ([]nbpeer.Pee if peerStatusLastSeen.Valid { p.Status.LastSeen = peerStatusLastSeen.Time } + if peerStatusSessionStartedAt.Valid { + p.Status.SessionStartedAt = peerStatusSessionStartedAt.Int64 + } if peerStatusConnected.Valid { p.Status.Connected = peerStatusConnected.Bool } diff --git a/management/server/store/store.go b/management/server/store/store.go index aa601c33f..a723c1fc3 100644 --- a/management/server/store/store.go +++ b/management/server/store/store.go @@ -167,6 +167,21 @@ type Store interface { GetAllEphemeralPeers(ctx context.Context, lockStrength LockingStrength) ([]*nbpeer.Peer, error) SavePeer(ctx context.Context, accountID string, peer *nbpeer.Peer) error SavePeerStatus(ctx context.Context, accountID, peerID string, status nbpeer.PeerStatus) error + // MarkPeerConnectedIfNewerSession sets the peer to connected with the + // given session token, but only when the stored SessionStartedAt is + // strictly less than newSessionStartedAt (the sentinel zero counts as + // "older"). LastSeen is recorded by the database at the moment the + // row is updated — never by the caller — so it always reflects the + // real write time even under lock contention. + // Returns true when the update happened, false when this stream lost + // the race against a newer session. + MarkPeerConnectedIfNewerSession(ctx context.Context, accountID, peerID string, newSessionStartedAt int64) (bool, error) + // MarkPeerDisconnectedIfSameSession sets the peer to disconnected and + // resets SessionStartedAt to zero, but only when the stored + // SessionStartedAt equals the given sessionStartedAt. LastSeen is + // recorded by the database. Returns true when the update happened, + // false when a newer session has taken over. + MarkPeerDisconnectedIfSameSession(ctx context.Context, accountID, peerID string, sessionStartedAt int64) (bool, error) SavePeerLocation(ctx context.Context, accountID string, peer *nbpeer.Peer) error ApproveAccountPeers(ctx context.Context, accountID string) (int, error) DeletePeer(ctx context.Context, accountID string, peerID string) error diff --git a/management/server/store/store_mock.go b/management/server/store/store_mock.go index 9780c521e..d51629606 100644 --- a/management/server/store/store_mock.go +++ b/management/server/store/store_mock.go @@ -2878,6 +2878,36 @@ func (mr *MockStoreMockRecorder) SavePeerStatus(ctx, accountID, peerID, status i return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SavePeerStatus", reflect.TypeOf((*MockStore)(nil).SavePeerStatus), ctx, accountID, peerID, status) } +// MarkPeerConnectedIfNewerSession mocks base method. +func (m *MockStore) MarkPeerConnectedIfNewerSession(ctx context.Context, accountID, peerID string, newSessionStartedAt int64) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MarkPeerConnectedIfNewerSession", ctx, accountID, peerID, newSessionStartedAt) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MarkPeerConnectedIfNewerSession indicates an expected call of MarkPeerConnectedIfNewerSession. +func (mr *MockStoreMockRecorder) MarkPeerConnectedIfNewerSession(ctx, accountID, peerID, newSessionStartedAt interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkPeerConnectedIfNewerSession", reflect.TypeOf((*MockStore)(nil).MarkPeerConnectedIfNewerSession), ctx, accountID, peerID, newSessionStartedAt) +} + +// MarkPeerDisconnectedIfSameSession mocks base method. +func (m *MockStore) MarkPeerDisconnectedIfSameSession(ctx context.Context, accountID, peerID string, sessionStartedAt int64) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MarkPeerDisconnectedIfSameSession", ctx, accountID, peerID, sessionStartedAt) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MarkPeerDisconnectedIfSameSession indicates an expected call of MarkPeerDisconnectedIfSameSession. +func (mr *MockStoreMockRecorder) MarkPeerDisconnectedIfSameSession(ctx, accountID, peerID, sessionStartedAt interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkPeerDisconnectedIfSameSession", reflect.TypeOf((*MockStore)(nil).MarkPeerDisconnectedIfSameSession), ctx, accountID, peerID, sessionStartedAt) +} + // SavePolicy mocks base method. func (m *MockStore) SavePolicy(ctx context.Context, policy *types2.Policy) error { m.ctrl.T.Helper() From af24fd779640538c05c5f261a1e9fdf20fe7773f Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Mon, 18 May 2026 22:55:19 +0200 Subject: [PATCH 06/10] [management] Add metrics for peer status updates and ephemeral cleanup (#6196) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [management] Add metrics for peer status updates and ephemeral cleanup The session-fenced MarkPeerConnected / MarkPeerDisconnected path and the ephemeral peer cleanup loop both run silently today: when fencing rejects a stale stream, when a cleanup tick deletes peers, or when a batch delete fails, we have no operational signal beyond log lines. Add OpenTelemetry counters and a histogram so the same SLO-style dashboards that already exist for the network-map controller can cover peer connect/disconnect and ephemeral cleanup too. All new attributes are bounded enums: operation in {connect,disconnect} and outcome in {applied,stale,error,peer_not_found}. No account, peer, or user ID is ever written as a metric label — total cardinality is fixed at compile time (8 counter series, 2 histogram series, 4 unlabeled ephemeral series). Metric methods are nil-receiver safe so test composition that doesn't wire telemetry (the bulk of the existing tests) works unchanged. The ephemeral manager exposes a SetMetrics setter rather than taking the collector through its constructor, keeping the constructor signature stable across all test call sites. * [management] Add OpenTelemetry metrics for ephemeral peer cleanup Introduce counters for tracking ephemeral peer cleanup, including peers pending deletion, cleanup runs, successful deletions, and failed batches. Metrics are nil-receiver safe to ensure compatibility with test setups without telemetry. --- .../peers/ephemeral/manager/ephemeral.go | 45 ++++++- management/internals/server/controllers.go | 6 +- management/server/peer.go | 28 +++++ .../telemetry/accountmanager_metrics.go | 65 ++++++++++ management/server/telemetry/app_metrics.go | 28 +++++ .../server/telemetry/ephemeral_metrics.go | 115 ++++++++++++++++++ 6 files changed, 281 insertions(+), 6 deletions(-) create mode 100644 management/server/telemetry/ephemeral_metrics.go diff --git a/management/internals/modules/peers/ephemeral/manager/ephemeral.go b/management/internals/modules/peers/ephemeral/manager/ephemeral.go index 758f643d0..0f902ea70 100644 --- a/management/internals/modules/peers/ephemeral/manager/ephemeral.go +++ b/management/internals/modules/peers/ephemeral/manager/ephemeral.go @@ -11,6 +11,7 @@ import ( "github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral" "github.com/netbirdio/netbird/management/server/activity" nbpeer "github.com/netbirdio/netbird/management/server/peer" + "github.com/netbirdio/netbird/management/server/telemetry" "github.com/netbirdio/netbird/management/server/store" ) @@ -47,6 +48,11 @@ type EphemeralManager struct { lifeTime time.Duration cleanupWindow time.Duration + + // metrics is nil-safe; methods on telemetry.EphemeralPeersMetrics + // no-op when the receiver is nil so deployments without an app + // metrics provider work unchanged. + metrics *telemetry.EphemeralPeersMetrics } // NewEphemeralManager instantiate new EphemeralManager @@ -60,6 +66,15 @@ func NewEphemeralManager(store store.Store, peersManager peers.Manager) *Ephemer } } +// SetMetrics attaches a metrics collector. Safe to call once before +// LoadInitialPeers; later attachment is fine but earlier loads won't be +// reflected in the gauge. Pass nil to detach. +func (e *EphemeralManager) SetMetrics(m *telemetry.EphemeralPeersMetrics) { + e.peersLock.Lock() + e.metrics = m + e.peersLock.Unlock() +} + // LoadInitialPeers load from the database the ephemeral type of peers and schedule a cleanup procedure to the head // of the linked list (to the most deprecated peer). At the end of cleanup it schedules the next cleanup to the new // head. @@ -97,7 +112,9 @@ func (e *EphemeralManager) OnPeerConnected(ctx context.Context, peer *nbpeer.Pee e.peersLock.Lock() defer e.peersLock.Unlock() - e.removePeer(peer.ID) + if e.removePeer(peer.ID) { + e.metrics.DecPending(1) + } // stop the unnecessary timer if e.headPeer == nil && e.timer != nil { @@ -123,6 +140,7 @@ func (e *EphemeralManager) OnPeerDisconnected(ctx context.Context, peer *nbpeer. } e.addPeer(peer.AccountID, peer.ID, e.newDeadLine()) + e.metrics.IncPending() if e.timer == nil { delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow if delay < 0 { @@ -145,6 +163,7 @@ func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) { for _, p := range peers { e.addPeer(p.AccountID, p.ID, t) } + e.metrics.AddPending(int64(len(peers))) log.WithContext(ctx).Debugf("loaded ephemeral peer(s): %d", len(peers)) } @@ -181,6 +200,15 @@ func (e *EphemeralManager) cleanup(ctx context.Context) { e.peersLock.Unlock() + // Drop the gauge by the number of entries we just took off the list, + // regardless of whether the subsequent DeletePeers call succeeds. The + // list invariant is what the gauge tracks; failed delete batches are + // counted separately via CountCleanupError so we can still see them. + if len(deletePeers) > 0 { + e.metrics.CountCleanupRun() + e.metrics.DecPending(int64(len(deletePeers))) + } + peerIDsPerAccount := make(map[string][]string) for id, p := range deletePeers { peerIDsPerAccount[p.accountID] = append(peerIDsPerAccount[p.accountID], id) @@ -191,7 +219,10 @@ func (e *EphemeralManager) cleanup(ctx context.Context) { err := e.peersManager.DeletePeers(ctx, accountID, peerIDs, activity.SystemInitiator, true) if err != nil { log.WithContext(ctx).Errorf("failed to delete ephemeral peers: %s", err) + e.metrics.CountCleanupError() + continue } + e.metrics.CountPeersCleaned(int64(len(peerIDs))) } } @@ -211,9 +242,12 @@ func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline tim e.tailPeer = ep } -func (e *EphemeralManager) removePeer(id string) { +// removePeer drops the entry from the linked list. Returns true if a +// matching entry was found and removed so callers can keep the pending +// metric gauge in sync. +func (e *EphemeralManager) removePeer(id string) bool { if e.headPeer == nil { - return + return false } if e.headPeer.id == id { @@ -221,7 +255,7 @@ func (e *EphemeralManager) removePeer(id string) { if e.tailPeer.id == id { e.tailPeer = nil } - return + return true } for p := e.headPeer; p.next != nil; p = p.next { @@ -231,9 +265,10 @@ func (e *EphemeralManager) removePeer(id string) { e.tailPeer = p } p.next = p.next.next - return + return true } } + return false } func (e *EphemeralManager) isPeerOnList(id string) bool { diff --git a/management/internals/server/controllers.go b/management/internals/server/controllers.go index 89bdf0abe..794c3ebe0 100644 --- a/management/internals/server/controllers.go +++ b/management/internals/server/controllers.go @@ -112,7 +112,11 @@ func (s *BaseServer) AuthManager() auth.Manager { func (s *BaseServer) EphemeralManager() ephemeral.Manager { return Create(s, func() ephemeral.Manager { - return manager.NewEphemeralManager(s.Store(), s.PeersManager()) + em := manager.NewEphemeralManager(s.Store(), s.PeersManager()) + if metrics := s.Metrics(); metrics != nil { + em.SetMetrics(metrics.EphemeralPeersMetrics()) + } + return em }) } diff --git a/management/server/peer.go b/management/server/peer.go index 4790a5aab..34b681f51 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -28,6 +28,7 @@ import ( "github.com/netbirdio/netbird/management/server/activity" nbpeer "github.com/netbirdio/netbird/management/server/peer" + "github.com/netbirdio/netbird/management/server/telemetry" "github.com/netbirdio/netbird/shared/management/status" ) @@ -72,19 +73,32 @@ func (am *DefaultAccountManager) GetPeers(ctx context.Context, accountID, userID // Disconnects use MarkPeerDisconnected and require the session to match // exactly; see PeerStatus.SessionStartedAt for the protocol. func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, realIP net.IP, accountID string, sessionStartedAt int64) error { + start := time.Now() + defer func() { + am.metrics.AccountManagerMetrics().RecordPeerStatusUpdateDuration(telemetry.PeerStatusConnect, time.Since(start)) + }() + peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerPubKey) if err != nil { + outcome := telemetry.PeerStatusError + if s, ok := status.FromError(err); ok && s.Type() == status.NotFound { + outcome = telemetry.PeerStatusPeerNotFound + } + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, outcome) return err } updated, err := am.Store.MarkPeerConnectedIfNewerSession(ctx, accountID, peer.ID, sessionStartedAt) if err != nil { + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, telemetry.PeerStatusError) return err } if !updated { + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, telemetry.PeerStatusStale) log.WithContext(ctx).Tracef("peer %s already has a newer session in store, skipping connect", peer.ID) return nil } + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, telemetry.PeerStatusApplied) if am.geo != nil && realIP != nil { am.updatePeerLocationIfChanged(ctx, accountID, peer, realIP) @@ -119,19 +133,33 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK // newer stream has already taken ownership of the peer — disconnects from // the older stream are ignored. LastSeen is written by the database. func (am *DefaultAccountManager) MarkPeerDisconnected(ctx context.Context, peerPubKey string, accountID string, sessionStartedAt int64) error { + start := time.Now() + defer func() { + am.metrics.AccountManagerMetrics().RecordPeerStatusUpdateDuration(telemetry.PeerStatusDisconnect, time.Since(start)) + }() + peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerPubKey) if err != nil { + outcome := telemetry.PeerStatusError + if s, ok := status.FromError(err); ok && s.Type() == status.NotFound { + outcome = telemetry.PeerStatusPeerNotFound + } + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, outcome) return err } updated, err := am.Store.MarkPeerDisconnectedIfSameSession(ctx, accountID, peer.ID, sessionStartedAt) if err != nil { + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, telemetry.PeerStatusError) return err } if !updated { + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, telemetry.PeerStatusStale) log.WithContext(ctx).Tracef("peer %s session token mismatch on disconnect (token=%d), skipping", peer.ID, sessionStartedAt) + return nil } + am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusDisconnect, telemetry.PeerStatusApplied) return nil } diff --git a/management/server/telemetry/accountmanager_metrics.go b/management/server/telemetry/accountmanager_metrics.go index 518aae7eb..bb6fb7e12 100644 --- a/management/server/telemetry/accountmanager_metrics.go +++ b/management/server/telemetry/accountmanager_metrics.go @@ -16,6 +16,8 @@ type AccountManagerMetrics struct { getPeerNetworkMapDurationMs metric.Float64Histogram networkMapObjectCount metric.Int64Histogram peerMetaUpdateCount metric.Int64Counter + peerStatusUpdateCounter metric.Int64Counter + peerStatusUpdateDurationMs metric.Float64Histogram } // NewAccountManagerMetrics creates an instance of AccountManagerMetrics @@ -64,6 +66,24 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account return nil, err } + // peerStatusUpdateCounter records every attempt to mark a peer as connected or disconnected + peerStatusUpdateCounter, err := meter.Int64Counter("management.account.peer.status.update.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of peer status update attempts, labeled by operation (connect|disconnect) and outcome (applied|stale|error|peer_not_found)")) + if err != nil { + return nil, err + } + + peerStatusUpdateDurationMs, err := meter.Float64Histogram("management.account.peer.status.update.duration.ms", + metric.WithUnit("milliseconds"), + metric.WithExplicitBucketBoundaries( + 1, 5, 15, 25, 50, 100, 250, 500, 1000, 2000, 5000, + ), + metric.WithDescription("Duration of a peer status update (fence UPDATE + post-write side effects), labeled by operation")) + if err != nil { + return nil, err + } + return &AccountManagerMetrics{ ctx: ctx, getPeerNetworkMapDurationMs: getPeerNetworkMapDurationMs, @@ -71,10 +91,35 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account updateAccountPeersCounter: updateAccountPeersCounter, networkMapObjectCount: networkMapObjectCount, peerMetaUpdateCount: peerMetaUpdateCount, + peerStatusUpdateCounter: peerStatusUpdateCounter, + peerStatusUpdateDurationMs: peerStatusUpdateDurationMs, }, nil } +// PeerStatusOperation labels the kind of fence-locked peer status write. +type PeerStatusOperation string + +// PeerStatusOutcome labels how a fence-locked peer status write resolved. +type PeerStatusOutcome string + +const ( + PeerStatusConnect PeerStatusOperation = "connect" + PeerStatusDisconnect PeerStatusOperation = "disconnect" + + // PeerStatusApplied — the fence WHERE matched and the UPDATE landed. + PeerStatusApplied PeerStatusOutcome = "applied" + // PeerStatusStale — the fence WHERE rejected the write because a + // newer session has already taken ownership (connect: stored token + // >= incoming; disconnect: stored token != incoming). + PeerStatusStale PeerStatusOutcome = "stale" + // PeerStatusError — the store returned a non-NotFound error. + PeerStatusError PeerStatusOutcome = "error" + // PeerStatusPeerNotFound — the peer lookup failed (the peer was + // deleted between the gRPC sync handshake and the status write). + PeerStatusPeerNotFound PeerStatusOutcome = "peer_not_found" +) + // CountUpdateAccountPeersDuration counts the duration of updating account peers func (metrics *AccountManagerMetrics) CountUpdateAccountPeersDuration(duration time.Duration) { metrics.updateAccountPeersDurationMs.Record(metrics.ctx, float64(duration.Nanoseconds())/1e6) @@ -104,3 +149,23 @@ func (metrics *AccountManagerMetrics) CountUpdateAccountPeersTriggered(resource, func (metrics *AccountManagerMetrics) CountPeerMetUpdate() { metrics.peerMetaUpdateCount.Add(metrics.ctx, 1) } + +// CountPeerStatusUpdate increments the connect/disconnect counter, +// labeled by operation and outcome. Both labels are bounded enums. +func (metrics *AccountManagerMetrics) CountPeerStatusUpdate(op PeerStatusOperation, outcome PeerStatusOutcome) { + metrics.peerStatusUpdateCounter.Add(metrics.ctx, 1, + metric.WithAttributes( + attribute.String("operation", string(op)), + attribute.String("outcome", string(outcome)), + ), + ) +} + +// RecordPeerStatusUpdateDuration records the wall-clock time spent +// running a peer status update (including post-write side effects), +// labeled by operation. +func (metrics *AccountManagerMetrics) RecordPeerStatusUpdateDuration(op PeerStatusOperation, d time.Duration) { + metrics.peerStatusUpdateDurationMs.Record(metrics.ctx, float64(d.Nanoseconds())/1e6, + metric.WithAttributes(attribute.String("operation", string(op))), + ) +} diff --git a/management/server/telemetry/app_metrics.go b/management/server/telemetry/app_metrics.go index 1fd78bc3a..fd9087a96 100644 --- a/management/server/telemetry/app_metrics.go +++ b/management/server/telemetry/app_metrics.go @@ -29,6 +29,7 @@ type MockAppMetrics struct { StoreMetricsFunc func() *StoreMetrics UpdateChannelMetricsFunc func() *UpdateChannelMetrics AddAccountManagerMetricsFunc func() *AccountManagerMetrics + EphemeralPeersMetricsFunc func() *EphemeralPeersMetrics } // GetMeter mocks the GetMeter function of the AppMetrics interface @@ -103,6 +104,14 @@ func (mock *MockAppMetrics) AccountManagerMetrics() *AccountManagerMetrics { return nil } +// EphemeralPeersMetrics mocks the MockAppMetrics function of the EphemeralPeersMetrics interface +func (mock *MockAppMetrics) EphemeralPeersMetrics() *EphemeralPeersMetrics { + if mock.EphemeralPeersMetricsFunc != nil { + return mock.EphemeralPeersMetricsFunc() + } + return nil +} + // AppMetrics is metrics interface type AppMetrics interface { GetMeter() metric2.Meter @@ -114,6 +123,7 @@ type AppMetrics interface { StoreMetrics() *StoreMetrics UpdateChannelMetrics() *UpdateChannelMetrics AccountManagerMetrics() *AccountManagerMetrics + EphemeralPeersMetrics() *EphemeralPeersMetrics } // defaultAppMetrics are core application metrics based on OpenTelemetry https://opentelemetry.io/ @@ -129,6 +139,7 @@ type defaultAppMetrics struct { storeMetrics *StoreMetrics updateChannelMetrics *UpdateChannelMetrics accountManagerMetrics *AccountManagerMetrics + ephemeralMetrics *EphemeralPeersMetrics } // IDPMetrics returns metrics for the idp package @@ -161,6 +172,11 @@ func (appMetrics *defaultAppMetrics) AccountManagerMetrics() *AccountManagerMetr return appMetrics.accountManagerMetrics } +// EphemeralPeersMetrics returns metrics for the ephemeral peer cleanup loop +func (appMetrics *defaultAppMetrics) EphemeralPeersMetrics() *EphemeralPeersMetrics { + return appMetrics.ephemeralMetrics +} + // Close stop application metrics HTTP handler and closes listener. func (appMetrics *defaultAppMetrics) Close() error { if appMetrics.listener == nil { @@ -245,6 +261,11 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) { return nil, fmt.Errorf("failed to initialize account manager metrics: %w", err) } + ephemeralMetrics, err := NewEphemeralPeersMetrics(ctx, meter) + if err != nil { + return nil, fmt.Errorf("failed to initialize ephemeral peers metrics: %w", err) + } + return &defaultAppMetrics{ Meter: meter, ctx: ctx, @@ -254,6 +275,7 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) { storeMetrics: storeMetrics, updateChannelMetrics: updateChannelMetrics, accountManagerMetrics: accountManagerMetrics, + ephemeralMetrics: ephemeralMetrics, }, nil } @@ -290,6 +312,11 @@ func NewAppMetricsWithMeter(ctx context.Context, meter metric2.Meter) (AppMetric return nil, fmt.Errorf("failed to initialize account manager metrics: %w", err) } + ephemeralMetrics, err := NewEphemeralPeersMetrics(ctx, meter) + if err != nil { + return nil, fmt.Errorf("failed to initialize ephemeral peers metrics: %w", err) + } + return &defaultAppMetrics{ Meter: meter, ctx: ctx, @@ -300,5 +327,6 @@ func NewAppMetricsWithMeter(ctx context.Context, meter metric2.Meter) (AppMetric storeMetrics: storeMetrics, updateChannelMetrics: updateChannelMetrics, accountManagerMetrics: accountManagerMetrics, + ephemeralMetrics: ephemeralMetrics, }, nil } diff --git a/management/server/telemetry/ephemeral_metrics.go b/management/server/telemetry/ephemeral_metrics.go new file mode 100644 index 000000000..a7fb432f8 --- /dev/null +++ b/management/server/telemetry/ephemeral_metrics.go @@ -0,0 +1,115 @@ +package telemetry + +import ( + "context" + + "go.opentelemetry.io/otel/metric" +) + +// EphemeralPeersMetrics tracks the ephemeral peer cleanup pipeline: how +// many peers are currently scheduled for deletion, how many tick runs +// the cleaner has performed, how many peers it has removed, and how +// many delete batches failed. +type EphemeralPeersMetrics struct { + ctx context.Context + + pending metric.Int64UpDownCounter + cleanupRuns metric.Int64Counter + peersCleaned metric.Int64Counter + errors metric.Int64Counter +} + +// NewEphemeralPeersMetrics constructs the ephemeral cleanup counters. +func NewEphemeralPeersMetrics(ctx context.Context, meter metric.Meter) (*EphemeralPeersMetrics, error) { + pending, err := meter.Int64UpDownCounter("management.ephemeral.peers.pending", + metric.WithUnit("1"), + metric.WithDescription("Number of ephemeral peers currently waiting to be cleaned up")) + if err != nil { + return nil, err + } + + cleanupRuns, err := meter.Int64Counter("management.ephemeral.cleanup.runs.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of ephemeral cleanup ticks that processed at least one peer")) + if err != nil { + return nil, err + } + + peersCleaned, err := meter.Int64Counter("management.ephemeral.peers.cleaned.counter", + metric.WithUnit("1"), + metric.WithDescription("Total number of ephemeral peers deleted by the cleanup loop")) + if err != nil { + return nil, err + } + + errors, err := meter.Int64Counter("management.ephemeral.cleanup.errors.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of ephemeral cleanup batches (per account) that failed to delete")) + if err != nil { + return nil, err + } + + return &EphemeralPeersMetrics{ + ctx: ctx, + pending: pending, + cleanupRuns: cleanupRuns, + peersCleaned: peersCleaned, + errors: errors, + }, nil +} + +// All methods are nil-receiver safe so callers that haven't wired metrics +// (tests, self-hosted with metrics off) can invoke them unconditionally. + +// IncPending bumps the pending gauge when a peer is added to the cleanup list. +func (m *EphemeralPeersMetrics) IncPending() { + if m == nil { + return + } + m.pending.Add(m.ctx, 1) +} + +// AddPending bumps the pending gauge by n — used at startup when the +// initial set of ephemeral peers is loaded from the store. +func (m *EphemeralPeersMetrics) AddPending(n int64) { + if m == nil || n <= 0 { + return + } + m.pending.Add(m.ctx, n) +} + +// DecPending decreases the pending gauge — used both when a peer reconnects +// before its deadline (removed from the list) and when a cleanup tick +// actually deletes it. +func (m *EphemeralPeersMetrics) DecPending(n int64) { + if m == nil || n <= 0 { + return + } + m.pending.Add(m.ctx, -n) +} + +// CountCleanupRun records one cleanup pass that processed >0 peers. Idle +// ticks (nothing to do) deliberately don't increment so the rate +// reflects useful work. +func (m *EphemeralPeersMetrics) CountCleanupRun() { + if m == nil { + return + } + m.cleanupRuns.Add(m.ctx, 1) +} + +// CountPeersCleaned records the number of peers a single tick deleted. +func (m *EphemeralPeersMetrics) CountPeersCleaned(n int64) { + if m == nil || n <= 0 { + return + } + m.peersCleaned.Add(m.ctx, n) +} + +// CountCleanupError records a failed delete batch. +func (m *EphemeralPeersMetrics) CountCleanupError() { + if m == nil { + return + } + m.errors.Add(m.ctx, 1) +} From 80966ab1b09bd86b7a526d9402b6a47438bc0943 Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Wed, 20 May 2026 08:25:30 +0200 Subject: [PATCH 07/10] [management] Ensure SessionStartedAt has a default value (#6211) * [management] Ensure SessionStartedAt has a default value Avoid null values for the new column * [management] Add PeerStatus with LastSeen in peer_test * [management] Add migration for PeerStatusSessionStartedAt default value * [management] Add PeerStatus with LastSeen in migration tests --- management/server/migration/migration_test.go | 6 +++++- management/server/peer/peer.go | 2 +- management/server/peer_test.go | 3 +++ management/server/store/store.go | 3 +++ 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/management/server/migration/migration_test.go b/management/server/migration/migration_test.go index 5e00976c2..cc97c2dff 100644 --- a/management/server/migration/migration_test.go +++ b/management/server/migration/migration_test.go @@ -198,7 +198,11 @@ func TestMigrateNetIPFieldFromBlobToJSON_WithJSONData(t *testing.T) { require.NoError(t, err, "Failed to insert account") account.PeersG = []nbpeer.Peer{ - {AccountID: "1234", Location: nbpeer.Location{ConnectionIP: net.IP{10, 0, 0, 1}}}, + { + AccountID: "1234", + Location: nbpeer.Location{ConnectionIP: net.IP{10, 0, 0, 1}}, + Status: &nbpeer.PeerStatus{LastSeen: time.Now()}, + }, } err = db.Save(account).Error diff --git a/management/server/peer/peer.go b/management/server/peer/peer.go index 2963dfcbd..6294d1c0a 100644 --- a/management/server/peer/peer.go +++ b/management/server/peer/peer.go @@ -86,7 +86,7 @@ type PeerStatus struct { //nolint:revive // active session". Integer nanoseconds are used so equality is // precision-safe across drivers, and so the predicates compose to a // single bigint comparison. - SessionStartedAt int64 + SessionStartedAt int64 `gorm:"not null;default:0"` // Connected indicates whether peer is connected to the management service or not Connected bool // LoginExpired diff --git a/management/server/peer_test.go b/management/server/peer_test.go index 07acf865f..9d6856740 100644 --- a/management/server/peer_test.go +++ b/management/server/peer_test.go @@ -2218,6 +2218,9 @@ func Test_IsUniqueConstraintError(t *testing.T) { ID: "test-peer-id", AccountID: "bf1c8084-ba50-4ce7-9439-34653001fc3b", DNSLabel: "test-peer-dns-label", + Status: &nbpeer.PeerStatus{ + LastSeen: time.Now(), + }, } for _, tt := range tests { diff --git a/management/server/store/store.go b/management/server/store/store.go index a723c1fc3..045f1576a 100644 --- a/management/server/store/store.go +++ b/management/server/store/store.go @@ -471,6 +471,9 @@ func getMigrationsPreAuto(ctx context.Context) []migrationFunc { func(db *gorm.DB) error { return migration.MigrateNewField[types.User](ctx, db, "email", "") }, + func(db *gorm.DB) error { + return migration.MigrateNewField[nbpeer.Peer](ctx, db, "peer_status_session_started_at", int64(0)) + }, func(db *gorm.DB) error { return migration.RemoveDuplicatePeerKeys(ctx, db) }, From d250f92c435bac83fd55f00fad3ee2c292eee910 Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Wed, 20 May 2026 10:08:34 +0200 Subject: [PATCH 08/10] feat(reverse-proxy): clusters API surfaces type, online status, and capability flags (#6148) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cluster listing now answers three questions in one round-trip instead of forcing the dashboard to cross-reference the domains API: which clusters can this account see, are they currently up, and what do they support. The ProxyCluster wire type drops the boolean self_hosted in favour of a `type` enum (`account` / `shared`) plus explicit `online`, `supports_custom_ports`, `require_subdomain`, and `supports_crowdsec` fields. Store query reworked so offline clusters still appear (no last_seen WHERE), with online and connected_proxies both derived from the existing 2-min active window via portable CASE expressions; the 1-hour heartbeat reaper still removes long-stale rows. Service manager enriches each cluster with the capability flags via the existing per-cluster lookups (CapabilityProvider now also exposes ClusterSupportsCrowdSec). GetActiveClusterAddresses* keep their tight 2-min filter so service routing and domain enumeration aren't pulled into the wider window. The hard cut removes self_hosted from the response — the dashboard is the only consumer and is updated in the matching PR; no transitional field is shipped. Adds a cross-engine regression test asserting offline clusters surface, connected_proxies counts only fresh proxies, and account-scoped BYOP clusters never leak across accounts. --- .../reverseproxy/proxy/manager/manager.go | 2 +- .../proxy/manager/manager_test.go | 2 +- .../modules/reverseproxy/proxy/proxy.go | 27 ++++- .../modules/reverseproxy/service/interface.go | 2 +- .../reverseproxy/service/interface_mock.go | 72 ++++++------ .../reverseproxy/service/manager/api.go | 14 ++- .../reverseproxy/service/manager/manager.go | 22 +++- .../shared/grpc/proxy_group_access_test.go | 2 +- .../shared/grpc/validate_session_test.go | 2 +- .../proxy/auth_callback_integration_test.go | 2 +- management/server/store/sql_store.go | 64 ++++++++-- .../store/sql_store_proxy_clusters_test.go | 109 ++++++++++++++++++ management/server/store/store.go | 2 +- management/server/store/store_mock.go | 86 +++++++------- proxy/management_integration_test.go | 2 +- shared/management/http/api/openapi.yml | 32 ++++- shared/management/http/api/types.gen.go | 73 +++++++++--- 17 files changed, 393 insertions(+), 122 deletions(-) create mode 100644 management/server/store/sql_store_proxy_clusters_test.go diff --git a/management/internals/modules/reverseproxy/proxy/manager/manager.go b/management/internals/modules/reverseproxy/proxy/manager/manager.go index b72a6ebe5..510500e0c 100644 --- a/management/internals/modules/reverseproxy/proxy/manager/manager.go +++ b/management/internals/modules/reverseproxy/proxy/manager/manager.go @@ -17,7 +17,7 @@ type store interface { UpdateProxyHeartbeat(ctx context.Context, p *proxy.Proxy) error GetActiveProxyClusterAddresses(ctx context.Context) ([]string, error) GetActiveProxyClusterAddressesForAccount(ctx context.Context, accountID string) ([]string, error) - GetActiveProxyClusters(ctx context.Context, accountID string) ([]proxy.Cluster, error) + GetProxyClusters(ctx context.Context, accountID string) ([]proxy.Cluster, error) GetClusterSupportsCustomPorts(ctx context.Context, clusterAddr string) *bool GetClusterRequireSubdomain(ctx context.Context, clusterAddr string) *bool GetClusterSupportsCrowdSec(ctx context.Context, clusterAddr string) *bool diff --git a/management/internals/modules/reverseproxy/proxy/manager/manager_test.go b/management/internals/modules/reverseproxy/proxy/manager/manager_test.go index 3c53fe684..3436216b4 100644 --- a/management/internals/modules/reverseproxy/proxy/manager/manager_test.go +++ b/management/internals/modules/reverseproxy/proxy/manager/manager_test.go @@ -57,7 +57,7 @@ func (m *mockStore) GetActiveProxyClusterAddressesForAccount(ctx context.Context } return nil, nil } -func (m *mockStore) GetActiveProxyClusters(_ context.Context, _ string) ([]proxy.Cluster, error) { +func (m *mockStore) GetProxyClusters(_ context.Context, _ string) ([]proxy.Cluster, error) { return nil, nil } func (m *mockStore) CleanupStaleProxies(ctx context.Context, d time.Duration) error { diff --git a/management/internals/modules/reverseproxy/proxy/proxy.go b/management/internals/modules/reverseproxy/proxy/proxy.go index 64394799e..9da7910df 100644 --- a/management/internals/modules/reverseproxy/proxy/proxy.go +++ b/management/internals/modules/reverseproxy/proxy/proxy.go @@ -42,10 +42,35 @@ func (Proxy) TableName() string { return "proxies" } +// ClusterType is the source of a proxy cluster. +type ClusterType string + +const ( + // ClusterTypeAccount is a cluster operated by the account itself (BYOP) — + // at least one proxy row in the cluster carries a non-NULL account_id. + ClusterTypeAccount ClusterType = "account" + // ClusterTypeShared is a cluster operated by NetBird and shared across + // accounts — all proxy rows in the cluster have account_id IS NULL. + ClusterTypeShared ClusterType = "shared" +) + // Cluster represents a group of proxy nodes serving the same address. +// +// Online and ConnectedProxies derive from the same 2-min active window +// the rest of the module uses, but Cluster rows are not gated on it — +// the cluster listing surfaces offline clusters too so operators can +// see and clean them up. The 1-hour heartbeat reaper still bounds the +// table eventually. type Cluster struct { ID string Address string + Type ClusterType + Online bool ConnectedProxies int - SelfHosted bool + // Capability flags. *bool because nil means "no proxy reported a + // capability for this cluster" — the dashboard renders these as + // unknown rather than false. + SupportsCustomPorts *bool + RequireSubdomain *bool + SupportsCrowdSec *bool } diff --git a/management/internals/modules/reverseproxy/service/interface.go b/management/internals/modules/reverseproxy/service/interface.go index 6a94aa32b..dddf6ae8a 100644 --- a/management/internals/modules/reverseproxy/service/interface.go +++ b/management/internals/modules/reverseproxy/service/interface.go @@ -9,7 +9,7 @@ import ( ) type Manager interface { - GetActiveClusters(ctx context.Context, accountID, userID string) ([]proxy.Cluster, error) + GetClusters(ctx context.Context, accountID, userID string) ([]proxy.Cluster, error) DeleteAccountCluster(ctx context.Context, accountID, userID, clusterAddress string) error GetAllServices(ctx context.Context, accountID, userID string) ([]*Service, error) GetService(ctx context.Context, accountID, userID, serviceID string) (*Service, error) diff --git a/management/internals/modules/reverseproxy/service/interface_mock.go b/management/internals/modules/reverseproxy/service/interface_mock.go index 83b2162ed..24963fe30 100644 --- a/management/internals/modules/reverseproxy/service/interface_mock.go +++ b/management/internals/modules/reverseproxy/service/interface_mock.go @@ -65,20 +65,6 @@ func (mr *MockManagerMockRecorder) CreateServiceFromPeer(ctx, accountID, peerID, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateServiceFromPeer", reflect.TypeOf((*MockManager)(nil).CreateServiceFromPeer), ctx, accountID, peerID, req) } -// DeleteAllServices mocks base method. -func (m *MockManager) DeleteAllServices(ctx context.Context, accountID, userID string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteAllServices", ctx, accountID, userID) - ret0, _ := ret[0].(error) - return ret0 -} - -// DeleteAllServices indicates an expected call of DeleteAllServices. -func (mr *MockManagerMockRecorder) DeleteAllServices(ctx, accountID, userID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAllServices", reflect.TypeOf((*MockManager)(nil).DeleteAllServices), ctx, accountID, userID) -} - // DeleteAccountCluster mocks base method. func (m *MockManager) DeleteAccountCluster(ctx context.Context, accountID, userID, clusterAddress string) error { m.ctrl.T.Helper() @@ -93,6 +79,20 @@ func (mr *MockManagerMockRecorder) DeleteAccountCluster(ctx, accountID, userID, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAccountCluster", reflect.TypeOf((*MockManager)(nil).DeleteAccountCluster), ctx, accountID, userID, clusterAddress) } +// DeleteAllServices mocks base method. +func (m *MockManager) DeleteAllServices(ctx context.Context, accountID, userID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteAllServices", ctx, accountID, userID) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteAllServices indicates an expected call of DeleteAllServices. +func (mr *MockManagerMockRecorder) DeleteAllServices(ctx, accountID, userID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAllServices", reflect.TypeOf((*MockManager)(nil).DeleteAllServices), ctx, accountID, userID) +} + // DeleteService mocks base method. func (m *MockManager) DeleteService(ctx context.Context, accountID, userID, serviceID string) error { m.ctrl.T.Helper() @@ -122,21 +122,6 @@ func (mr *MockManagerMockRecorder) GetAccountServices(ctx, accountID interface{} return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAccountServices", reflect.TypeOf((*MockManager)(nil).GetAccountServices), ctx, accountID) } -// GetActiveClusters mocks base method. -func (m *MockManager) GetActiveClusters(ctx context.Context, accountID, userID string) ([]proxy.Cluster, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetActiveClusters", ctx, accountID, userID) - ret0, _ := ret[0].([]proxy.Cluster) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetActiveClusters indicates an expected call of GetActiveClusters. -func (mr *MockManagerMockRecorder) GetActiveClusters(ctx, accountID, userID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActiveClusters", reflect.TypeOf((*MockManager)(nil).GetActiveClusters), ctx, accountID, userID) -} - // GetAllServices mocks base method. func (m *MockManager) GetAllServices(ctx context.Context, accountID, userID string) ([]*Service, error) { m.ctrl.T.Helper() @@ -152,19 +137,19 @@ func (mr *MockManagerMockRecorder) GetAllServices(ctx, accountID, userID interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllServices", reflect.TypeOf((*MockManager)(nil).GetAllServices), ctx, accountID, userID) } -// GetServiceByDomain mocks base method. -func (m *MockManager) GetServiceByDomain(ctx context.Context, domain string) (*Service, error) { +// GetClusters mocks base method. +func (m *MockManager) GetClusters(ctx context.Context, accountID, userID string) ([]proxy.Cluster, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetServiceByDomain", ctx, domain) - ret0, _ := ret[0].(*Service) + ret := m.ctrl.Call(m, "GetClusters", ctx, accountID, userID) + ret0, _ := ret[0].([]proxy.Cluster) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetServiceByDomain indicates an expected call of GetServiceByDomain. -func (mr *MockManagerMockRecorder) GetServiceByDomain(ctx, domain interface{}) *gomock.Call { +// GetClusters indicates an expected call of GetClusters. +func (mr *MockManagerMockRecorder) GetClusters(ctx, accountID, userID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetServiceByDomain", reflect.TypeOf((*MockManager)(nil).GetServiceByDomain), ctx, domain) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetClusters", reflect.TypeOf((*MockManager)(nil).GetClusters), ctx, accountID, userID) } // GetGlobalServices mocks base method. @@ -197,6 +182,21 @@ func (mr *MockManagerMockRecorder) GetService(ctx, accountID, userID, serviceID return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetService", reflect.TypeOf((*MockManager)(nil).GetService), ctx, accountID, userID, serviceID) } +// GetServiceByDomain mocks base method. +func (m *MockManager) GetServiceByDomain(ctx context.Context, domain string) (*Service, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetServiceByDomain", ctx, domain) + ret0, _ := ret[0].(*Service) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetServiceByDomain indicates an expected call of GetServiceByDomain. +func (mr *MockManagerMockRecorder) GetServiceByDomain(ctx, domain interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetServiceByDomain", reflect.TypeOf((*MockManager)(nil).GetServiceByDomain), ctx, domain) +} + // GetServiceByID mocks base method. func (m *MockManager) GetServiceByID(ctx context.Context, accountID, serviceID string) (*Service, error) { m.ctrl.T.Helper() diff --git a/management/internals/modules/reverseproxy/service/manager/api.go b/management/internals/modules/reverseproxy/service/manager/api.go index 08272077c..9d93d52ee 100644 --- a/management/internals/modules/reverseproxy/service/manager/api.go +++ b/management/internals/modules/reverseproxy/service/manager/api.go @@ -187,7 +187,7 @@ func (h *handler) getClusters(w http.ResponseWriter, r *http.Request) { return } - clusters, err := h.manager.GetActiveClusters(r.Context(), userAuth.AccountId, userAuth.UserId) + clusters, err := h.manager.GetClusters(r.Context(), userAuth.AccountId, userAuth.UserId) if err != nil { util.WriteError(r.Context(), err, w) return @@ -196,10 +196,14 @@ func (h *handler) getClusters(w http.ResponseWriter, r *http.Request) { apiClusters := make([]api.ProxyCluster, 0, len(clusters)) for _, c := range clusters { apiClusters = append(apiClusters, api.ProxyCluster{ - Id: c.ID, - Address: c.Address, - ConnectedProxies: c.ConnectedProxies, - SelfHosted: c.SelfHosted, + Id: c.ID, + Address: c.Address, + Type: api.ProxyClusterType(c.Type), + Online: c.Online, + ConnectedProxies: c.ConnectedProxies, + SupportsCustomPorts: c.SupportsCustomPorts, + RequireSubdomain: c.RequireSubdomain, + SupportsCrowdsec: c.SupportsCrowdSec, }) } diff --git a/management/internals/modules/reverseproxy/service/manager/manager.go b/management/internals/modules/reverseproxy/service/manager/manager.go index 4a8598afb..ca0c5540f 100644 --- a/management/internals/modules/reverseproxy/service/manager/manager.go +++ b/management/internals/modules/reverseproxy/service/manager/manager.go @@ -81,6 +81,7 @@ type ClusterDeriver interface { type CapabilityProvider interface { ClusterSupportsCustomPorts(ctx context.Context, clusterAddr string) *bool ClusterRequireSubdomain(ctx context.Context, clusterAddr string) *bool + ClusterSupportsCrowdSec(ctx context.Context, clusterAddr string) *bool } type Manager struct { @@ -112,8 +113,12 @@ func (m *Manager) StartExposeReaper(ctx context.Context) { m.exposeReaper.StartExposeReaper(ctx) } -// GetActiveClusters returns all active proxy clusters with their connected proxy count. -func (m *Manager) GetActiveClusters(ctx context.Context, accountID, userID string) ([]proxy.Cluster, error) { +// GetClusters returns every proxy cluster visible to the account +// (shared + its own BYOP), regardless of whether any proxy in the +// cluster is currently heartbeating. Each cluster is enriched with the +// capability flags reported by its active proxies so the dashboard can +// render feature support without a second round-trip. +func (m *Manager) GetClusters(ctx context.Context, accountID, userID string) ([]proxy.Cluster, error) { ok, err := m.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Services, operations.Read) if err != nil { return nil, status.NewPermissionValidationError(err) @@ -122,7 +127,18 @@ func (m *Manager) GetActiveClusters(ctx context.Context, accountID, userID strin return nil, status.NewPermissionDeniedError() } - return m.store.GetActiveProxyClusters(ctx, accountID) + clusters, err := m.store.GetProxyClusters(ctx, accountID) + if err != nil { + return nil, err + } + + for i := range clusters { + clusters[i].SupportsCustomPorts = m.capabilities.ClusterSupportsCustomPorts(ctx, clusters[i].Address) + clusters[i].RequireSubdomain = m.capabilities.ClusterRequireSubdomain(ctx, clusters[i].Address) + clusters[i].SupportsCrowdSec = m.capabilities.ClusterSupportsCrowdSec(ctx, clusters[i].Address) + } + + return clusters, nil } // DeleteAccountCluster removes all proxy registrations for the given cluster address diff --git a/management/internals/shared/grpc/proxy_group_access_test.go b/management/internals/shared/grpc/proxy_group_access_test.go index 46dad5b56..5980f8a30 100644 --- a/management/internals/shared/grpc/proxy_group_access_test.go +++ b/management/internals/shared/grpc/proxy_group_access_test.go @@ -109,7 +109,7 @@ func (m *mockReverseProxyManager) GetServiceByDomain(_ context.Context, domain s return nil, errors.New("service not found for domain: " + domain) } -func (m *mockReverseProxyManager) GetActiveClusters(_ context.Context, _, _ string) ([]proxy.Cluster, error) { +func (m *mockReverseProxyManager) GetClusters(_ context.Context, _, _ string) ([]proxy.Cluster, error) { return nil, nil } diff --git a/management/internals/shared/grpc/validate_session_test.go b/management/internals/shared/grpc/validate_session_test.go index 7b7ffcfb2..774c5d1d3 100644 --- a/management/internals/shared/grpc/validate_session_test.go +++ b/management/internals/shared/grpc/validate_session_test.go @@ -322,7 +322,7 @@ func (m *testValidateSessionServiceManager) GetServiceByDomain(ctx context.Conte return m.store.GetServiceByDomain(ctx, domain) } -func (m *testValidateSessionServiceManager) GetActiveClusters(_ context.Context, _, _ string) ([]proxy.Cluster, error) { +func (m *testValidateSessionServiceManager) GetClusters(_ context.Context, _, _ string) ([]proxy.Cluster, error) { return nil, nil } diff --git a/management/server/http/handlers/proxy/auth_callback_integration_test.go b/management/server/http/handlers/proxy/auth_callback_integration_test.go index 30d8aa0e7..f08d5daf1 100644 --- a/management/server/http/handlers/proxy/auth_callback_integration_test.go +++ b/management/server/http/handlers/proxy/auth_callback_integration_test.go @@ -444,7 +444,7 @@ func (m *testServiceManager) GetServiceByDomain(ctx context.Context, domain stri return m.store.GetServiceByDomain(ctx, domain) } -func (m *testServiceManager) GetActiveClusters(_ context.Context, _, _ string) ([]nbproxy.Cluster, error) { +func (m *testServiceManager) GetClusters(_ context.Context, _, _ string) ([]nbproxy.Cluster, error) { return nil, nil } diff --git a/management/server/store/sql_store.go b/management/server/store/sql_store.go index 8cf37de56..f3c6b741b 100644 --- a/management/server/store/sql_store.go +++ b/management/server/store/sql_store.go @@ -5736,19 +5736,67 @@ func (s *SqlStore) DeleteAccountCluster(ctx context.Context, clusterAddress, acc return nil } -func (s *SqlStore) GetActiveProxyClusters(ctx context.Context, accountID string) ([]proxy.Cluster, error) { - var clusters []proxy.Cluster +// GetProxyClusters returns every cluster the account can see (shared +// plus its own BYOP), regardless of whether any proxy in the cluster +// is currently heartbeating. Online and ConnectedProxies are derived +// from the 2-min active window so the dashboard can render offline +// clusters distinctly; the 1-hour heartbeat reaper still removes rows +// that go quiet for too long. +// +// AccountOwned is determined by whether any proxy row in the group +// carries a non-NULL account_id; the caller maps that to Cluster.Type. +// Capability flags are NOT filled here — the handler enriches them via +// the per-cluster capability lookups. +func (s *SqlStore) GetProxyClusters(ctx context.Context, accountID string) ([]proxy.Cluster, error) { + activeCutoff := time.Now().Add(-proxyActiveThreshold) + type clusterRow struct { + ID string + Address string + ConnectedProxies int + Online bool + AccountOwned bool + } + + var rows []clusterRow result := s.db.Model(&proxy.Proxy{}). - Select("MIN(id) as id, cluster_address as address, COUNT(*) as connected_proxies, COUNT(account_id) > 0 as self_hosted"). - Where("status = ? AND last_seen > ? AND (account_id IS NULL OR account_id = ?)", - proxy.StatusConnected, time.Now().Add(-proxyActiveThreshold), accountID). + Select( + "MIN(id) AS id, "+ + "cluster_address AS address, "+ + // COUNT(CASE WHEN ... THEN 1 END) counts only non-NULL — i.e. only + // rows that satisfy the predicate — so it works portably across + // sqlite/postgres/mysql without dialect-specific FILTER syntax. + "COUNT(CASE WHEN status = ? AND last_seen > ? THEN 1 END) AS connected_proxies, "+ + // MAX(CASE …) > 0 expresses BOOL_OR in a way Postgres tolerates + // (Postgres can't MAX a boolean column). + "MAX(CASE WHEN status = ? AND last_seen > ? THEN 1 ELSE 0 END) > 0 AS online, "+ + "MAX(CASE WHEN account_id IS NOT NULL THEN 1 ELSE 0 END) > 0 AS account_owned", + proxy.StatusConnected, activeCutoff, + proxy.StatusConnected, activeCutoff, + ). + Where("account_id IS NULL OR account_id = ?", accountID). Group("cluster_address"). - Scan(&clusters) + Scan(&rows) if result.Error != nil { - log.WithContext(ctx).Errorf("failed to get active proxy clusters: %v", result.Error) - return nil, status.Errorf(status.Internal, "get active proxy clusters") + log.WithContext(ctx).Errorf("failed to get proxy clusters: %v", result.Error) + return nil, status.Errorf(status.Internal, "get proxy clusters") + } + + clusters := make([]proxy.Cluster, 0, len(rows)) + for _, r := range rows { + c := proxy.Cluster{ + ID: r.ID, + Address: r.Address, + Online: r.Online, + ConnectedProxies: r.ConnectedProxies, + } + if r.AccountOwned { + c.Type = proxy.ClusterTypeAccount + } else { + c.Type = proxy.ClusterTypeShared + } + clusters = append(clusters, c) } return clusters, nil diff --git a/management/server/store/sql_store_proxy_clusters_test.go b/management/server/store/sql_store_proxy_clusters_test.go new file mode 100644 index 000000000..cdacfedae --- /dev/null +++ b/management/server/store/sql_store_proxy_clusters_test.go @@ -0,0 +1,109 @@ +package store + +import ( + "context" + "os" + "runtime" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + rpproxy "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/proxy" +) + +// TestSqlStore_GetProxyClusters_DerivesOnlineAndType guards the +// account-visible cluster list against silent regressions in two +// dimensions: +// +// 1. Online derivation: a cluster with one stale and one fresh proxy +// is online and counts only the fresh proxy; a cluster whose +// proxies all heartbeated outside the 2-min window appears offline +// with connected_proxies = 0 (rather than disappearing, which is +// what the old query did). +// 2. Type derivation: a cluster scoped to the calling account is +// reported as `account`; a cluster with account_id IS NULL is +// reported as `shared`. Clusters scoped to other accounts must not +// leak into the result. +// +// Capability flags are intentionally not asserted here — they're filled +// by the manager (handler) layer from the per-cluster capability +// lookups, not by the store query. +func TestSqlStore_GetProxyClusters_DerivesOnlineAndType(t *testing.T) { + if (os.Getenv("CI") == "true" && runtime.GOOS == "darwin") || runtime.GOOS == "windows" { + t.Skip("skip CI tests on darwin and windows") + } + + runTestForAllEngines(t, "", func(t *testing.T, store Store) { + ctx := context.Background() + accountID := "acct-clusters" + require.NoError(t, store.SaveAccount(ctx, newAccountWithId(ctx, accountID, "user-1", ""))) + + otherAccountID := "acct-other" + require.NoError(t, store.SaveAccount(ctx, newAccountWithId(ctx, otherAccountID, "user-2", ""))) + + acctID := accountID + otherID := otherAccountID + + fresh := time.Now().Add(-30 * time.Second) + stale := time.Now().Add(-30 * time.Minute) + + mustSave := func(id, cluster string, accID *string, status string, lastSeen time.Time) { + require.NoError(t, store.SaveProxy(ctx, &rpproxy.Proxy{ + ID: id, + SessionID: id + "-sess", + ClusterAddress: cluster, + IPAddress: "10.0.0.1", + AccountID: accID, + LastSeen: lastSeen, + Status: status, + })) + } + + // shared-mixed: one fresh + one stale proxy → online, connected=1 + mustSave("p-shared-fresh", "shared-mixed.netbird.io", nil, rpproxy.StatusConnected, fresh) + mustSave("p-shared-stale", "shared-mixed.netbird.io", nil, rpproxy.StatusConnected, stale) + + // shared-offline: only stale proxies → offline, connected=0, + // but row must still appear (this is the new semantic — old + // query would have dropped it entirely). + mustSave("p-shared-off", "shared-offline.netbird.io", nil, rpproxy.StatusConnected, stale) + + // account-online: BYOP cluster owned by acctID, fresh + mustSave("p-acct-fresh", "byop.acct.example", &acctID, rpproxy.StatusConnected, fresh) + + // other-account: must not surface for acctID + mustSave("p-other", "byop.other.example", &otherID, rpproxy.StatusConnected, fresh) + + clusters, err := store.GetProxyClusters(ctx, accountID) + require.NoError(t, err) + + byAddr := map[string]rpproxy.Cluster{} + for _, c := range clusters { + byAddr[c.Address] = c + } + + assert.NotContains(t, byAddr, "byop.other.example", + "another account's BYOP cluster must not leak into this account's listing") + + require.Contains(t, byAddr, "shared-mixed.netbird.io") + mixed := byAddr["shared-mixed.netbird.io"] + assert.Equal(t, rpproxy.ClusterTypeShared, mixed.Type, "shared cluster (account_id IS NULL) must be reported as Type=shared") + assert.True(t, mixed.Online, "cluster with a fresh proxy must be online") + assert.Equal(t, 1, mixed.ConnectedProxies, "connected_proxies must count only fresh proxies; the stale one should not bump the count") + + require.Contains(t, byAddr, "shared-offline.netbird.io", + "offline clusters must still appear so the dashboard can render them — the old GetActiveProxyClusters would have dropped this row, which is the regression this test guards against") + offline := byAddr["shared-offline.netbird.io"] + assert.Equal(t, rpproxy.ClusterTypeShared, offline.Type) + assert.False(t, offline.Online, "no fresh heartbeat → offline") + assert.Equal(t, 0, offline.ConnectedProxies, "no fresh proxies → connected_proxies=0") + + require.Contains(t, byAddr, "byop.acct.example") + acct := byAddr["byop.acct.example"] + assert.Equal(t, rpproxy.ClusterTypeAccount, acct.Type, "BYOP cluster owned by the account must be reported as Type=account") + assert.True(t, acct.Online) + assert.Equal(t, 1, acct.ConnectedProxies) + }) +} diff --git a/management/server/store/store.go b/management/server/store/store.go index 045f1576a..42cdcf36d 100644 --- a/management/server/store/store.go +++ b/management/server/store/store.go @@ -307,7 +307,7 @@ type Store interface { UpdateProxyHeartbeat(ctx context.Context, p *proxy.Proxy) error GetActiveProxyClusterAddresses(ctx context.Context) ([]string, error) GetActiveProxyClusterAddressesForAccount(ctx context.Context, accountID string) ([]string, error) - GetActiveProxyClusters(ctx context.Context, accountID string) ([]proxy.Cluster, error) + GetProxyClusters(ctx context.Context, accountID string) ([]proxy.Cluster, error) GetClusterSupportsCustomPorts(ctx context.Context, clusterAddr string) *bool GetClusterRequireSubdomain(ctx context.Context, clusterAddr string) *bool GetClusterSupportsCrowdSec(ctx context.Context, clusterAddr string) *bool diff --git a/management/server/store/store_mock.go b/management/server/store/store_mock.go index d51629606..4f9d875d2 100644 --- a/management/server/store/store_mock.go +++ b/management/server/store/store_mock.go @@ -380,6 +380,20 @@ func (mr *MockStoreMockRecorder) DeleteAccount(ctx, account interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAccount", reflect.TypeOf((*MockStore)(nil).DeleteAccount), ctx, account) } +// DeleteAccountCluster mocks base method. +func (m *MockStore) DeleteAccountCluster(ctx context.Context, clusterAddress, accountID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteAccountCluster", ctx, clusterAddress, accountID) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteAccountCluster indicates an expected call of DeleteAccountCluster. +func (mr *MockStoreMockRecorder) DeleteAccountCluster(ctx, clusterAddress, accountID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAccountCluster", reflect.TypeOf((*MockStore)(nil).DeleteAccountCluster), ctx, clusterAddress, accountID) +} + // DeleteCustomDomain mocks base method. func (m *MockStore) DeleteCustomDomain(ctx context.Context, accountID, domainID string) error { m.ctrl.T.Helper() @@ -577,20 +591,6 @@ func (mr *MockStoreMockRecorder) DeletePostureChecks(ctx, accountID, postureChec return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeletePostureChecks", reflect.TypeOf((*MockStore)(nil).DeletePostureChecks), ctx, accountID, postureChecksID) } -// DeleteAccountCluster mocks base method. -func (m *MockStore) DeleteAccountCluster(ctx context.Context, clusterAddress, accountID string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteAccountCluster", ctx, clusterAddress, accountID) - ret0, _ := ret[0].(error) - return ret0 -} - -// DeleteAccountCluster indicates an expected call of DeleteAccountCluster. -func (mr *MockStoreMockRecorder) DeleteAccountCluster(ctx, clusterAddress, accountID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAccountCluster", reflect.TypeOf((*MockStore)(nil).DeleteAccountCluster), ctx, clusterAddress, accountID) -} - // DeleteRoute mocks base method. func (m *MockStore) DeleteRoute(ctx context.Context, accountID, routeID string) error { m.ctrl.T.Helper() @@ -731,6 +731,20 @@ func (mr *MockStoreMockRecorder) DeleteZoneDNSRecords(ctx, accountID, zoneID int return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteZoneDNSRecords", reflect.TypeOf((*MockStore)(nil).DeleteZoneDNSRecords), ctx, accountID, zoneID) } +// DisconnectProxy mocks base method. +func (m *MockStore) DisconnectProxy(ctx context.Context, proxyID, sessionID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DisconnectProxy", ctx, proxyID, sessionID) + ret0, _ := ret[0].(error) + return ret0 +} + +// DisconnectProxy indicates an expected call of DisconnectProxy. +func (mr *MockStoreMockRecorder) DisconnectProxy(ctx, proxyID, sessionID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisconnectProxy", reflect.TypeOf((*MockStore)(nil).DisconnectProxy), ctx, proxyID, sessionID) +} + // EphemeralServiceExists mocks base method. func (m *MockStore) EphemeralServiceExists(ctx context.Context, lockStrength LockingStrength, accountID, peerID, domain string) (bool, error) { m.ctrl.T.Helper() @@ -1332,21 +1346,6 @@ func (mr *MockStoreMockRecorder) GetActiveProxyClusterAddressesForAccount(ctx, a return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActiveProxyClusterAddressesForAccount", reflect.TypeOf((*MockStore)(nil).GetActiveProxyClusterAddressesForAccount), ctx, accountID) } -// GetActiveProxyClusters mocks base method. -func (m *MockStore) GetActiveProxyClusters(ctx context.Context, accountID string) ([]proxy.Cluster, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetActiveProxyClusters", ctx, accountID) - ret0, _ := ret[0].([]proxy.Cluster) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetActiveProxyClusters indicates an expected call of GetActiveProxyClusters. -func (mr *MockStoreMockRecorder) GetActiveProxyClusters(ctx, accountID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActiveProxyClusters", reflect.TypeOf((*MockStore)(nil).GetActiveProxyClusters), ctx, accountID) -} - // GetAllAccounts mocks base method. func (m *MockStore) GetAllAccounts(ctx context.Context) []*types2.Account { m.ctrl.T.Helper() @@ -2048,6 +2047,21 @@ func (mr *MockStoreMockRecorder) GetProxyByAccountID(ctx, accountID interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProxyByAccountID", reflect.TypeOf((*MockStore)(nil).GetProxyByAccountID), ctx, accountID) } +// GetProxyClusters mocks base method. +func (m *MockStore) GetProxyClusters(ctx context.Context, accountID string) ([]proxy.Cluster, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetProxyClusters", ctx, accountID) + ret0, _ := ret[0].([]proxy.Cluster) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetProxyClusters indicates an expected call of GetProxyClusters. +func (mr *MockStoreMockRecorder) GetProxyClusters(ctx, accountID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProxyClusters", reflect.TypeOf((*MockStore)(nil).GetProxyClusters), ctx, accountID) +} + // GetResourceGroups mocks base method. func (m *MockStore) GetResourceGroups(ctx context.Context, lockStrength LockingStrength, accountID, resourceID string) ([]*types2.Group, error) { m.ctrl.T.Helper() @@ -2950,20 +2964,6 @@ func (mr *MockStoreMockRecorder) SaveProxy(ctx, proxy interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveProxy", reflect.TypeOf((*MockStore)(nil).SaveProxy), ctx, proxy) } -// DisconnectProxy mocks base method. -func (m *MockStore) DisconnectProxy(ctx context.Context, proxyID, sessionID string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DisconnectProxy", ctx, proxyID, sessionID) - ret0, _ := ret[0].(error) - return ret0 -} - -// DisconnectProxy indicates an expected call of DisconnectProxy. -func (mr *MockStoreMockRecorder) DisconnectProxy(ctx, proxyID, sessionID interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisconnectProxy", reflect.TypeOf((*MockStore)(nil).DisconnectProxy), ctx, proxyID, sessionID) -} - // SaveProxyAccessToken mocks base method. func (m *MockStore) SaveProxyAccessToken(ctx context.Context, token *types2.ProxyAccessToken) error { m.ctrl.T.Helper() diff --git a/proxy/management_integration_test.go b/proxy/management_integration_test.go index 9fd3d2ce9..d7e891801 100644 --- a/proxy/management_integration_test.go +++ b/proxy/management_integration_test.go @@ -366,7 +366,7 @@ func (m *storeBackedServiceManager) GetServiceByDomain(ctx context.Context, doma return m.store.GetServiceByDomain(ctx, domain) } -func (m *storeBackedServiceManager) GetActiveClusters(_ context.Context, _, _ string) ([]nbproxy.Cluster, error) { +func (m *storeBackedServiceManager) GetClusters(_ context.Context, _, _ string) ([]nbproxy.Cluster, error) { return nil, nil } diff --git a/shared/management/http/api/openapi.yml b/shared/management/http/api/openapi.yml index 942f3aa45..353aff72d 100644 --- a/shared/management/http/api/openapi.yml +++ b/shared/management/http/api/openapi.yml @@ -3417,19 +3417,43 @@ components: type: string description: Cluster address used for CNAME targets example: "eu.proxy.netbird.io" + type: + $ref: '#/components/schemas/ProxyClusterType' + online: + type: boolean + description: Whether at least one proxy in the cluster has heartbeated within the active window + example: true connected_proxies: type: integer - description: Number of proxy nodes connected in this cluster + description: Number of proxy nodes currently connected (heartbeat within the active window) example: 3 - self_hosted: + supports_custom_ports: type: boolean - description: Whether this cluster is a self-hosted (BYOP) proxy managed by the account owner + description: Whether the cluster supports binding arbitrary TCP/UDP ports + example: true + require_subdomain: + type: boolean + description: Whether services on this cluster must include a subdomain label + example: false + supports_crowdsec: + type: boolean + description: Whether all active proxies in the cluster have CrowdSec configured example: false required: - id - address + - type + - online - connected_proxies - - self_hosted + ProxyClusterType: + type: string + description: | + Source of the proxy cluster. `account` clusters are owned and operated by the account (BYOP); + `shared` clusters are operated by NetBird and shared across accounts. + enum: + - account + - shared + example: shared ReverseProxyDomainType: type: string description: Type of Reverse Proxy Domain diff --git a/shared/management/http/api/types.gen.go b/shared/management/http/api/types.gen.go index b3bb475a9..16e765f8c 100644 --- a/shared/management/http/api/types.gen.go +++ b/shared/management/http/api/types.gen.go @@ -1,6 +1,6 @@ // Package api provides primitives to interact with the openapi HTTP API. // -// Code generated by github.com/oapi-codegen/oapi-codegen/v2 version v2.6.0 DO NOT EDIT. +// Code generated by github.com/oapi-codegen/oapi-codegen/v2 version v2.7.0 DO NOT EDIT. package api import ( @@ -13,8 +13,8 @@ import ( ) const ( - BearerAuthScopes = "BearerAuth.Scopes" - TokenAuthScopes = "TokenAuth.Scopes" + BearerAuthScopes bearerAuthContextKey = "BearerAuth.Scopes" + TokenAuthScopes tokenAuthContextKey = "TokenAuth.Scopes" ) // Defines values for AccessRestrictionsCrowdsecMode. @@ -511,6 +511,7 @@ func (e GroupMinimumIssued) Valid() bool { // Defines values for IdentityProviderType. const ( + IdentityProviderTypeAdfs IdentityProviderType = "adfs" IdentityProviderTypeEntra IdentityProviderType = "entra" IdentityProviderTypeGoogle IdentityProviderType = "google" IdentityProviderTypeMicrosoft IdentityProviderType = "microsoft" @@ -518,12 +519,13 @@ const ( IdentityProviderTypeOkta IdentityProviderType = "okta" IdentityProviderTypePocketid IdentityProviderType = "pocketid" IdentityProviderTypeZitadel IdentityProviderType = "zitadel" - IdentityProviderTypeAdfs IdentityProviderType = "adfs" ) // Valid indicates whether the value is a known member of the IdentityProviderType enum. func (e IdentityProviderType) Valid() bool { switch e { + case IdentityProviderTypeAdfs: + return true case IdentityProviderTypeEntra: return true case IdentityProviderTypeGoogle: @@ -538,8 +540,6 @@ func (e IdentityProviderType) Valid() bool { return true case IdentityProviderTypeZitadel: return true - case IdentityProviderTypeAdfs: - return true default: return false } @@ -878,6 +878,24 @@ func (e PolicyRuleUpdateProtocol) Valid() bool { } } +// Defines values for ProxyClusterType. +const ( + ProxyClusterTypeAccount ProxyClusterType = "account" + ProxyClusterTypeShared ProxyClusterType = "shared" +) + +// Valid indicates whether the value is a known member of the ProxyClusterType enum. +func (e ProxyClusterType) Valid() bool { + switch e { + case ProxyClusterTypeAccount: + return true + case ProxyClusterTypeShared: + return true + default: + return false + } +} + // Defines values for ResourceType. const ( ResourceTypeDomain ResourceType = "domain" @@ -1638,7 +1656,9 @@ type Checks struct { // OsVersionCheck Posture check for the version of operating system OsVersionCheck *OSVersionCheck `json:"os_version_check,omitempty"` - // PeerNetworkRangeCheck Posture check for allow or deny access based on the peer's IP addresses. A range matches when it contains any of the peer's local network interface IPs or its public connection (NAT egress) IP, so ranges may target private subnets, public CIDRs, or single hosts via a /32 or /128. + // PeerNetworkRangeCheck Posture check for allow or deny access based on the peer's IP addresses. A range matches when it + // contains any of the peer's local network interface IPs or its public connection (NAT egress) IP, + // so ranges may target private subnets, public CIDRs, or single hosts via a /32 or /128. PeerNetworkRangeCheck *PeerNetworkRangeCheck `json:"peer_network_range_check,omitempty"` // ProcessCheck Posture Check for binaries exist and are running in the peer’s system @@ -3330,7 +3350,9 @@ type PeerMinimum struct { Name string `json:"name"` } -// PeerNetworkRangeCheck Posture check for allow or deny access based on the peer's IP addresses. A range matches when it contains any of the peer's local network interface IPs or its public connection (NAT egress) IP, so ranges may target private subnets, public CIDRs, or single hosts via a /32 or /128. +// PeerNetworkRangeCheck Posture check for allow or deny access based on the peer's IP addresses. A range matches when it +// contains any of the peer's local network interface IPs or its public connection (NAT egress) IP, +// so ranges may target private subnets, public CIDRs, or single hosts via a /32 or /128. type PeerNetworkRangeCheck struct { // Action Action to take upon policy match Action PeerNetworkRangeCheckAction `json:"action"` @@ -3785,19 +3807,36 @@ type ProxyAccessLogsResponse struct { // ProxyCluster A proxy cluster represents a group of proxy nodes serving the same address type ProxyCluster struct { - // Id Unique identifier of a proxy in this cluster - Id string `json:"id"` - // Address Cluster address used for CNAME targets Address string `json:"address"` - // ConnectedProxies Number of proxy nodes connected in this cluster + // ConnectedProxies Number of proxy nodes currently connected (heartbeat within the active window) ConnectedProxies int `json:"connected_proxies"` - // SelfHosted Whether this cluster is a self-hosted (BYOP) proxy managed by the account owner - SelfHosted bool `json:"self_hosted"` + // Id Unique identifier of a proxy in this cluster + Id string `json:"id"` + + // Online Whether at least one proxy in the cluster has heartbeated within the active window + Online bool `json:"online"` + + // RequireSubdomain Whether services on this cluster must include a subdomain label + RequireSubdomain *bool `json:"require_subdomain,omitempty"` + + // SupportsCrowdsec Whether all active proxies in the cluster have CrowdSec configured + SupportsCrowdsec *bool `json:"supports_crowdsec,omitempty"` + + // SupportsCustomPorts Whether the cluster supports binding arbitrary TCP/UDP ports + SupportsCustomPorts *bool `json:"supports_custom_ports,omitempty"` + + // Type Source of the proxy cluster. `account` clusters are owned and operated by the account (BYOP); + // `shared` clusters are operated by NetBird and shared across accounts. + Type ProxyClusterType `json:"type"` } +// ProxyClusterType Source of the proxy cluster. `account` clusters are owned and operated by the account (BYOP); +// `shared` clusters are operated by NetBird and shared across accounts. +type ProxyClusterType string + // ProxyToken defines model for ProxyToken. type ProxyToken struct { CreatedAt time.Time `json:"created_at"` @@ -4820,6 +4859,12 @@ type ZoneRequest struct { // Conflict Standard error response. Note: The exact structure of this error response is inferred from `util.WriteErrorResponse` and `util.WriteError` usage in the provided Go code, as a specific Go struct for errors was not provided. type Conflict = ErrorResponse +// bearerAuthContextKey is the context key for BearerAuth security scheme +type bearerAuthContextKey string + +// tokenAuthContextKey is the context key for TokenAuth security scheme +type tokenAuthContextKey string + // GetApiEventsNetworkTrafficParams defines parameters for GetApiEventsNetworkTraffic. type GetApiEventsNetworkTrafficParams struct { // Page Page number From c784b0255063b9cbfde830c78670de2400e46c1c Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Wed, 20 May 2026 12:21:03 +0200 Subject: [PATCH 09/10] [misc] Update contribution guidelines (#6219) Update contribution guidelines and PR template to require discussing impactful changes with the team --- .github/pull_request_template.md | 1 + CONTRIBUTING.md | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 9d6bc96eb..8e68054bd 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -12,6 +12,7 @@ - [ ] Is a feature enhancement - [ ] It is a refactor - [ ] Created tests that fail without the change (if possible) +- [ ] This change does **not** modify the public API, gRPC protocols, functionality behavior, CLI / service flags, or introduce a new feature — **OR** I have discussed it with the NetBird team beforehand (link the issue / Slack thread in the description). See [CONTRIBUTING.md](https://github.com/netbirdio/netbird/blob/main/CONTRIBUTING.md#discuss-changes-with-the-netbird-team-first). > By submitting this pull request, you confirm that you have read and agree to the terms of the [Contributor License Agreement](https://github.com/netbirdio/netbird/blob/main/CONTRIBUTOR_LICENSE_AGREEMENT.md). diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 960cd30e9..cd1c087bb 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -15,6 +15,7 @@ If you haven't already, join our slack workspace [here](https://docs.netbird.io/ - [Contributing to NetBird](#contributing-to-netbird) - [Contents](#contents) - [Code of conduct](#code-of-conduct) + - [Discuss changes with the NetBird team first](#discuss-changes-with-the-netbird-team-first) - [Directory structure](#directory-structure) - [Development setup](#development-setup) - [Requirements](#requirements) @@ -33,6 +34,14 @@ Conduct which can be found in the file [CODE_OF_CONDUCT.md](CODE_OF_CONDUCT.md). By participating, you are expected to uphold this code. Please report unacceptable behavior to community@netbird.io. +## Discuss changes with the NetBird team first + +Changes to the **public API**, **gRPC protocols**, **functionality behavior**, **CLI / service flags**, or **new features** should be discussed with the NetBird team before you start the work. These surfaces are part of NetBird's contract with operators, self-hosters, and downstream integrators, and changes to them have compatibility, security, and release-planning implications that benefit from an early conversation. + +Open an issue or reach out on [Slack](https://docs.netbird.io/slack-url) to talk through what you have in mind. We'll help shape the change, flag any constraints we know about, and confirm the direction so the PR review can focus on implementation rather than design. + +Typical bug fixes, internal refactors, documentation updates, and tests do not need pre-discussion — open the PR directly. + ## Directory structure The NetBird project monorepo is organized to maintain most of its individual dependencies code within their directories, except for a few auxiliary or shared packages. From 9192b4f029f8f0eeaad77fff8625c34ba9849668 Mon Sep 17 00:00:00 2001 From: Viktor Liu <17948409+lixmal@users.noreply.github.com> Date: Wed, 20 May 2026 20:09:22 +0900 Subject: [PATCH 10/10] [client] Bump macOS sleep callback timeout to 20s (#6220) --- client/internal/sleep/detector_darwin.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/client/internal/sleep/detector_darwin.go b/client/internal/sleep/detector_darwin.go index ef495bded..fc4713b21 100644 --- a/client/internal/sleep/detector_darwin.go +++ b/client/internal/sleep/detector_darwin.go @@ -188,7 +188,9 @@ func (d *Detector) triggerCallback(event EventType, cb func(event EventType), do } doneChan := make(chan struct{}) - timeout := time.NewTimer(500 * time.Millisecond) + // macOS forces sleep ~30s after kIOMessageSystemWillSleep, so block long + // enough for teardown to finish while staying under that deadline. + timeout := time.NewTimer(20 * time.Second) defer timeout.Stop() go func() {