Compare commits

...

25 Commits

Author SHA1 Message Date
mlsmaycon
5ebd39ad33 Log SQL connection pool stats periodically to monitor saturation and improve debugging. 2026-04-25 09:54:58 +02:00
mlsmaycon
69c0b96d73 Refactor fast-path Sync to log skip reasons, streamline tryFastPathSync outputs, and improve debug consistency. 2026-04-24 21:25:32 +02:00
mlsmaycon
d3ea28734c Introduce network serial caching in sync fast path, optimize DB reads, and add granular cache invalidation 2026-04-24 20:50:47 +02:00
mlsmaycon
4dddafc5a1 Add caching for ExtraSettings and peer groups in fast path to reduce DB reads. 2026-04-24 19:19:58 +02:00
mlsmaycon
8c521a7cb5 Refactor sync fast path to introduce caching for ExtraSettings and peer groups, optimize MarkPeerConnected with async writes, and reduce DB round trips. 2026-04-24 18:13:37 +02:00
mlsmaycon
ac6b73005d Upgrade cache logic in sync fast path to handle legacy entries and avoid corrupting HasUser flag. 2026-04-24 17:35:33 +02:00
mlsmaycon
cf7081e592 Refactor peer cache logic in sync fast path; consolidate and optimize write operations 2026-04-24 13:33:15 +02:00
mlsmaycon
94730fe066 Add debug log for cache hit in sync fast path 2026-04-24 12:00:32 +02:00
mlsmaycon
7e9d3485d8 [management] Cache peer snapshot + consolidate auth reads on Sync hot path
Trim the fast-path Sync handler by removing two DB round trips on cache hit:

1. Consolidate GetUserIDByPeerKey + GetAccountIDByPeerPubKey into a single
   GetPeerAuthInfoByPubKey store call. Both looked up the same peer row by
   pubkey and returned one column each; the new method SELECTs both columns
   in one query. AccountManager exposes it as GetPeerAuthInfo.

2. Extend peerSyncEntry with AccountID, PeerID, PeerKey, Ephemeral and a
   HasUser flag so the cache carries everything the fast path needs. On
   cache hit with a matching metaHash:

    - The Sync handler skips GetPeerAuthInfo entirely (entry.AccountID and
      entry.HasUser drive the loginFilter gate).
    - commitFastPath skips GetPeerByPeerPubKey by using the cached peer
      snapshot for OnPeerConnectedWithPeer.

Old cache entries from pre-step-2 shape still decode (missing fields zero
out) but IsComplete() returns false, so they fall through to the slow path
and get rewritten with the full shape on first pass. No migration needed.

Expected impact on a 16.8 s pathological Sync observed in production:
~6 s saved from eliminating one auth-read round trip, the pre-fast-path
GetPeerAuthInfo on cache hit, and GetPeerByPeerPubKey in commitFastPath.
Cache miss / cold start remain on the slow path unchanged.

Account-serial, ExtraSettings and peer-group caching — the remaining
synchronous DB reads — are deliberately left for a follow-up so the
invalidation design can be proven incrementally.
2026-04-24 11:41:59 +02:00
mlsmaycon
5993264d34 Add detailed timing logs to sync fast path operations 2026-04-24 08:07:12 +02:00
mlsmaycon
617ceab2e3 Add OnPeerConnectedWithPeer to optimize sync fast path operations 2026-04-22 22:40:31 +02:00
mlsmaycon
53deabbdb4 Add timing log for GetExtraSettings in sync fast path 2026-04-22 15:00:21 +02:00
mlsmaycon
ac3fe4343b Refactor sync fast path logging for improved clarity and timing accuracy 2026-04-22 14:24:52 +02:00
mlsmaycon
a4ae160993 Fix deferred logging function in commitFastPath for correct execution 2026-04-22 11:41:32 +02:00
mlsmaycon
3ac4263257 Add timing instrumentation for sync fast path functions 2026-04-22 01:23:44 +02:00
mlsmaycon
dc86c9655d Improve timing precision in sync fast path logging 2026-04-22 00:39:09 +02:00
mlsmaycon
66494d61af Replace Tracef with Debugf for sync fast path logging 2026-04-22 00:06:39 +02:00
mlsmaycon
46446acd30 Add detailed timing logs to sync fast path operations 2026-04-21 23:02:58 +02:00
mlsmaycon
3eb1298cb4 Refactor sync fast path tests and fix CI flakiness
- Introduce `skipOnWindows` helper to properly skip tests relying on Unix specific paths.
- Replace fixed sleep with `require.Eventually` in `waitForPeerDisconnect` to address flakiness in CI.
- Split `commitFastPath` logic out of `runFastPathSync` to close race conditions and improve clarity.
- Update tests to leverage new helpers and more precise assertions (e.g., `waitForPeerDisconnect`).
- Add `flakyStore` test helper to exercise fail-closed behavior in flag handling.
- Enhance `RunFastPathFlagRoutine` to disable the flag on store read errors.
2026-04-21 17:07:31 +02:00
mlsmaycon
93391fc68f generate only current.bin and android_current.bin on ci/cd 2026-04-21 16:49:54 +02:00
mlsmaycon
48c080b861 Replace Redis dependency with a generic cache store for fast path flag handling 2026-04-21 16:28:24 +02:00
mlsmaycon
3716838c25 Remove unused cacheKey helper and testcontainers imports, simplify Redis container setup 2026-04-21 16:17:31 +02:00
mlsmaycon
5d58000dbd Merge branch 'main' into cached-serial-check-on-sync 2026-04-21 15:55:47 +02:00
mlsmaycon
8430b06f2a [management] Add Redis-backed kill switch for Sync fast path
Gate the peer-sync fast path on a runtime flag polled from Redis so operators can roll the optimisation out gradually and flip it off without a redeploy.

Without NB_PEER_SYNC_REDIS_ADDRESS the routine stays disabled, every Sync runs the full network map path, and no entries accumulate in the peer serial cache — bit-for-bit identical to the pre-fast-path behaviour. When the env var is set, a background goroutine polls the configured key (default "peerSyncFastPath") every minute; values "1" or "true" enable the fast path, anything else disables it.

- RunFastPathFlagRoutine mirrors shared/logleveloverrider: dedicated Redis connection, background ticker, redis.Nil treated as disabled.
- NewServer takes the flag handle; tryFastPathSync and the recordPeerSyncEntry helpers short-circuit when Enabled() is false.
- invalidatePeerSyncEntry still runs on Login regardless of flag state.
- NewFastPathFlag(bool) exposed for tests and callers that need to force a state without going through Redis.
2026-04-21 15:52:34 +02:00
mlsmaycon
3f4ef0031b [management] Skip full network map on Sync when peer state is unchanged
Introduce a peer-sync cache keyed by WireGuard pubkey that records the
NetworkMap.Serial and meta hash the server last delivered to each peer.
When a Sync request arrives from a non-Android peer whose cached serial
matches the current account serial and whose meta hash matches the last
delivery, short-circuit SyncAndMarkPeer and reply with a NetbirdConfig-only
SyncResponse mirroring the shape TimeBasedAuthSecretsManager already pushes
for TURN/Relay token rotation. The client keeps its existing network map
state and refreshes only control-plane credentials.

The fast path avoids GetAccountWithBackpressure, the full per-peer map
assembly, posture-check recomputation and the large encrypted payload on
every reconnect of a peer whose account is quiescent. Slow path remains
the source of truth for any real state change; every full-map send (initial
sync or streamed NetworkMap update) rewrites the cache, and every Login
deletes it so a fresh map is guaranteed after SSH key rotation, approval
changes or re-registration.

Backend-only: no proto changes and no client changes. Compatibility is
provided by the existing client handling of nil NetworkMap in handleSync
(every version from v0.20.0 on). Android is gated out at the server because
its readInitialSettings path calls GrpcClient.GetNetworkMap which errors on
nil map. The cache is wired through BaseServer.CacheStore() so it shares
the same Redis/in-memory backend as OneTimeTokenStore and PKCEVerifierStore.

Test coverage lands in four layers:
- Pure decision function (peer_serial_cache_decision_test.go)
- Cache wrapper with TTL + concurrency (peer_serial_cache_test.go)
- Response shape unit tests (sync_fast_path_response_test.go)
- In-process gRPC behavioural tests covering first sync, reconnect skip,
  android never-skip, meta change, login invalidation, and serial advance
  (management/server/sync_fast_path_test.go)
- Frozen SyncRequest wire-format fixtures for v0.20.0 / v0.40.0 / v0.60.0
  / current / android replayed against the in-process server
  (management/server/sync_legacy_wire_test.go + testdata fixtures)
2026-04-17 16:20:04 +02:00
37 changed files with 2511 additions and 52 deletions

View File

@@ -426,6 +426,9 @@ jobs:
if: matrix.store == 'mysql'
run: docker pull mlsmaycon/warmed-mysql:8
- name: Generate current sync wire fixtures
run: go run ./management/server/testdata/sync_request_wire/generate.go
- name: Test
run: |
CGO_ENABLED=1 GOARCH=${{ matrix.arch }} \

View File

@@ -135,7 +135,7 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp
if err != nil {
t.Fatal(err)
}
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &mgmt.MockIntegratedValidator{}, networkMapController, nil)
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &mgmt.MockIntegratedValidator{}, networkMapController, nil, nil, nil, nil)
if err != nil {
t.Fatal(err)
}

View File

@@ -1671,7 +1671,7 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
if err != nil {
return nil, "", err
}
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil)
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil, nil, nil, nil)
if err != nil {
return nil, "", err
}

View File

@@ -335,7 +335,7 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
if err != nil {
return nil, "", err
}
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil)
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil, nil, nil, nil)
if err != nil {
return nil, "", err
}

View File

@@ -132,9 +132,18 @@ func (c *Controller) OnPeerConnected(ctx context.Context, accountID string, peer
return nil, fmt.Errorf("failed to get peer %s: %v", peerID, err)
}
c.EphemeralPeersManager.OnPeerConnected(ctx, peer)
return c.OnPeerConnectedWithPeer(ctx, accountID, peer)
}
return c.peersUpdateManager.CreateChannel(ctx, peerID), nil
// OnPeerConnectedWithPeer is the peer-object variant of OnPeerConnected. It
// skips the internal GetPeerByID and is intended for callers that already
// hold the peer (e.g. the Sync fast path). The accountID parameter is kept
// for symmetry with OnPeerConnected even though the peer object already
// carries it — callers typically have it handy from the surrounding context.
func (c *Controller) OnPeerConnectedWithPeer(ctx context.Context, accountID string, peer *nbpeer.Peer) (chan *network_map.UpdateMessage, error) {
_ = accountID
c.EphemeralPeersManager.OnPeerConnected(ctx, peer)
return c.peersUpdateManager.CreateChannel(ctx, peer.ID), nil
}
func (c *Controller) OnPeerDisconnected(ctx context.Context, accountID string, peerID string) {

View File

@@ -35,6 +35,11 @@ type Controller interface {
OnPeersDeleted(ctx context.Context, accountID string, peerIDs []string) error
DisconnectPeers(ctx context.Context, accountId string, peerIDs []string)
OnPeerConnected(ctx context.Context, accountID string, peerID string) (chan *UpdateMessage, error)
// OnPeerConnectedWithPeer is equivalent to OnPeerConnected but accepts an
// already-fetched peer, skipping the internal GetPeerByID lookup. Intended
// for callers that have already resolved the peer (e.g. the Sync fast path)
// so the controller does not re-read what the caller just read.
OnPeerConnectedWithPeer(ctx context.Context, accountID string, peer *nbpeer.Peer) (chan *UpdateMessage, error)
OnPeerDisconnected(ctx context.Context, accountID string, peerID string)
TrackEphemeralPeer(ctx context.Context, peer *nbpeer.Peer)

View File

@@ -1,9 +1,9 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: management/internals/controllers/network_map/interface.go
// Source: ./interface.go
//
// Generated by this command:
//
// mockgen -package network_map -destination=management/internals/controllers/network_map/interface_mock.go -source=management/internals/controllers/network_map/interface.go -build_flags=-mod=mod
// mockgen -package network_map -destination=interface_mock.go -source=./interface.go -build_flags=-mod=mod
//
// Package network_map is a generated GoMock package.
@@ -145,6 +145,21 @@ func (mr *MockControllerMockRecorder) OnPeerConnected(ctx, accountID, peerID any
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnPeerConnected", reflect.TypeOf((*MockController)(nil).OnPeerConnected), ctx, accountID, peerID)
}
// OnPeerConnectedWithPeer mocks base method.
func (m *MockController) OnPeerConnectedWithPeer(ctx context.Context, accountID string, arg2 *peer.Peer) (chan *UpdateMessage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "OnPeerConnectedWithPeer", ctx, accountID, arg2)
ret0, _ := ret[0].(chan *UpdateMessage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// OnPeerConnectedWithPeer indicates an expected call of OnPeerConnectedWithPeer.
func (mr *MockControllerMockRecorder) OnPeerConnectedWithPeer(ctx, accountID, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnPeerConnectedWithPeer", reflect.TypeOf((*MockController)(nil).OnPeerConnectedWithPeer), ctx, accountID, arg2)
}
// OnPeerDisconnected mocks base method.
func (m *MockController) OnPeerDisconnected(ctx context.Context, accountID, peerID string) {
m.ctrl.T.Helper()

View File

@@ -163,7 +163,9 @@ func (s *BaseServer) GRPCServer() *grpc.Server {
}
gRPCAPIHandler := grpc.NewServer(gRPCOpts...)
srv, err := nbgrpc.NewServer(s.Config, s.AccountManager(), s.SettingsManager(), s.JobManager(), s.SecretsManager(), s.Metrics(), s.AuthManager(), s.IntegratedValidator(), s.NetworkMapController(), s.OAuthConfigProvider())
peerSerialCache := nbgrpc.NewPeerSerialCache(context.Background(), s.CacheStore(), nbgrpc.DefaultPeerSerialCacheTTL)
fastPathFlag := nbgrpc.RunFastPathFlagRoutine(context.Background(), s.CacheStore(), nbgrpc.DefaultFastPathFlagInterval, nbgrpc.DefaultFastPathFlagKey)
srv, err := nbgrpc.NewServer(s.Config, s.AccountManager(), s.SettingsManager(), s.JobManager(), s.SecretsManager(), s.Metrics(), s.AuthManager(), s.IntegratedValidator(), s.NetworkMapController(), s.OAuthConfigProvider(), peerSerialCache, fastPathFlag, s.CacheStore())
if err != nil {
log.Fatalf("failed to create management server: %v", err)
}

View File

@@ -0,0 +1,48 @@
// Package fastpathcache exposes the key prefixes and delete helpers for the
// Sync fast-path caches so mutation sites outside the gRPC server package
// can invalidate stale entries without a circular import on the grpc
// package that owns the read-side cache wrappers.
package fastpathcache
import (
"context"
"github.com/eko/gocache/lib/v4/cache"
cachestore "github.com/eko/gocache/lib/v4/store"
log "github.com/sirupsen/logrus"
)
const (
// ExtraSettingsKeyPrefix matches the prefix used by the read-side
// extraSettingsCache in management/internals/shared/grpc. Keep these in
// sync; drift would leak stale reads on mutations.
ExtraSettingsKeyPrefix = "extra-settings:"
// PeerGroupsKeyPrefix matches the prefix used by the read-side
// peerGroupsCache in management/internals/shared/grpc.
PeerGroupsKeyPrefix = "peer-groups:"
)
// InvalidateExtraSettings removes the cached ExtraSettings entry for the
// given account from the shared cache store. Safe to call with a nil store
// and safe to call when no entry exists. Errors are swallowed at debug level
// so mutation flows never fail because of a cache hiccup.
func InvalidateExtraSettings(ctx context.Context, store cachestore.StoreInterface, accountID string) {
if store == nil {
return
}
if err := cache.New[string](store).Delete(ctx, ExtraSettingsKeyPrefix+accountID); err != nil {
log.WithContext(ctx).Debugf("fastpathcache: invalidate extra settings for %s: %v", accountID, err)
}
}
// InvalidatePeerGroups removes the cached peer-groups entry for a peer. Safe
// to call with a nil store and safe to call when no entry exists.
func InvalidatePeerGroups(ctx context.Context, store cachestore.StoreInterface, peerID string) {
if store == nil {
return
}
if err := cache.New[string](store).Delete(ctx, PeerGroupsKeyPrefix+peerID); err != nil {
log.WithContext(ctx).Debugf("fastpathcache: invalidate peer groups for %s: %v", peerID, err)
}
}

View File

@@ -0,0 +1,122 @@
package grpc
import (
"context"
"encoding/json"
"time"
"github.com/eko/gocache/lib/v4/cache"
"github.com/eko/gocache/lib/v4/store"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/management/internals/shared/fastpathcache"
nbtypes "github.com/netbirdio/netbird/management/server/types"
)
const (
extraSettingsCacheKeyPrefix = fastpathcache.ExtraSettingsKeyPrefix
peerGroupsCacheKeyPrefix = fastpathcache.PeerGroupsKeyPrefix
// DefaultExtraSettingsCacheTTL bounds how long a cached ExtraSettings
// blob survives. Settings rarely change; a ~30s window is cheap and
// bounded by the fact that a change also rotates through recordPeerSync
// writes (which don't affect this cache, but client reconnects do).
DefaultExtraSettingsCacheTTL = 30 * time.Second
// DefaultPeerGroupsCacheTTL bounds how long a cached peer group set
// survives. Shorter than ExtraSettings because group membership changes
// have user-visible authz implications.
DefaultPeerGroupsCacheTTL = 15 * time.Second
)
// extraSettingsCache caches the ExtraSettings JSON per account so the fast
// path's buildFastPathResponse can skip GetExtraSettings on cache hit.
// TTL-based; staleness window is ~DefaultExtraSettingsCacheTTL.
type extraSettingsCache struct {
cache *cache.Cache[string]
ctx context.Context
ttl time.Duration
}
func newExtraSettingsCache(ctx context.Context, cacheStore store.StoreInterface, ttl time.Duration) *extraSettingsCache {
if cacheStore == nil {
return nil
}
return &extraSettingsCache{cache: cache.New[string](cacheStore), ctx: ctx, ttl: ttl}
}
func (c *extraSettingsCache) get(accountID string) (*nbtypes.ExtraSettings, bool) {
if c == nil {
return nil, false
}
raw, err := c.cache.Get(c.ctx, extraSettingsCacheKeyPrefix+accountID)
if err != nil {
return nil, false
}
var es nbtypes.ExtraSettings
if err := json.Unmarshal([]byte(raw), &es); err != nil {
log.Debugf("extra settings cache: unmarshal for %s: %v", accountID, err)
return nil, false
}
return &es, true
}
func (c *extraSettingsCache) set(accountID string, es *nbtypes.ExtraSettings) {
if c == nil || es == nil {
return
}
payload, err := json.Marshal(es)
if err != nil {
log.Debugf("extra settings cache: marshal for %s: %v", accountID, err)
return
}
if err := c.cache.Set(c.ctx, extraSettingsCacheKeyPrefix+accountID, string(payload), store.WithExpiration(c.ttl)); err != nil {
log.Debugf("extra settings cache: set for %s: %v", accountID, err)
}
}
// peerGroupsCache caches the list of group IDs a peer belongs to so the fast
// path's buildFastPathResponse can skip GetPeerGroupIDs on cache hit. The
// cache key includes the peerID; group membership changes propagate via TTL.
type peerGroupsCache struct {
cache *cache.Cache[string]
ctx context.Context
ttl time.Duration
}
func newPeerGroupsCache(ctx context.Context, cacheStore store.StoreInterface, ttl time.Duration) *peerGroupsCache {
if cacheStore == nil {
return nil
}
return &peerGroupsCache{cache: cache.New[string](cacheStore), ctx: ctx, ttl: ttl}
}
func (c *peerGroupsCache) get(peerID string) ([]string, bool) {
if c == nil {
return nil, false
}
raw, err := c.cache.Get(c.ctx, peerGroupsCacheKeyPrefix+peerID)
if err != nil {
return nil, false
}
var ids []string
if err := json.Unmarshal([]byte(raw), &ids); err != nil {
log.Debugf("peer groups cache: unmarshal for %s: %v", peerID, err)
return nil, false
}
return ids, true
}
func (c *peerGroupsCache) set(peerID string, ids []string) {
if c == nil {
return
}
payload, err := json.Marshal(ids)
if err != nil {
log.Debugf("peer groups cache: marshal for %s: %v", peerID, err)
return
}
if err := c.cache.Set(c.ctx, peerGroupsCacheKeyPrefix+peerID, string(payload), store.WithExpiration(c.ttl)); err != nil {
log.Debugf("peer groups cache: set for %s: %v", peerID, err)
}
}

View File

@@ -0,0 +1,131 @@
package grpc
import (
"context"
"errors"
"strings"
"sync/atomic"
"time"
"github.com/eko/gocache/lib/v4/cache"
"github.com/eko/gocache/lib/v4/store"
log "github.com/sirupsen/logrus"
)
const (
// DefaultFastPathFlagInterval is the default poll interval for the Sync
// fast-path feature flag. Kept lower than the log-level overrider because
// operators will want the toggle to propagate quickly during rollout.
DefaultFastPathFlagInterval = 1 * time.Minute
// DefaultFastPathFlagKey is the cache key polled by RunFastPathFlagRoutine
// when the caller does not provide an override.
DefaultFastPathFlagKey = "peerSyncFastPath"
)
// FastPathFlag exposes the current on/off state of the Sync fast path. The
// zero value and a nil receiver both report disabled, so callers can always
// treat the flag as a non-nil gate without an additional nil check.
type FastPathFlag struct {
enabled atomic.Bool
}
// NewFastPathFlag returns a FastPathFlag whose state is set to the given
// value. Callers that need the runtime toggle should use
// RunFastPathFlagRoutine instead; this constructor is meant for tests and
// for consumers that want to force the flag on or off.
func NewFastPathFlag(enabled bool) *FastPathFlag {
f := &FastPathFlag{}
f.setEnabled(enabled)
return f
}
// Enabled reports whether the Sync fast path is currently enabled for this
// replica. A nil receiver reports false so a disabled build or test can pass
// a nil flag and skip the fast path entirely.
func (f *FastPathFlag) Enabled() bool {
if f == nil {
return false
}
return f.enabled.Load()
}
func (f *FastPathFlag) setEnabled(v bool) {
if f == nil {
return
}
f.enabled.Store(v)
}
// RunFastPathFlagRoutine starts a background goroutine that polls the shared
// cache store for the Sync fast-path feature flag and updates the returned
// FastPathFlag accordingly. When cacheStore is nil the routine returns a
// handle that stays permanently disabled, so every Sync falls back to the
// full network map path.
//
// The shared store is Redis-backed when NB_CACHE_REDIS_ADDRESS is set (so the
// flag is toggled cluster-wide by writing the key in Redis) and falls back to
// an in-process gocache otherwise, which is enough for single-replica dev and
// test setups.
//
// The routine fails closed: any store read error (other than a plain "key not
// found" miss) disables the flag until Redis confirms it is enabled again.
func RunFastPathFlagRoutine(ctx context.Context, cacheStore store.StoreInterface, interval time.Duration, flagKey string) *FastPathFlag {
flag := &FastPathFlag{}
if cacheStore == nil {
log.Infof("Shared cache store not provided. Sync fast path disabled")
return flag
}
if flagKey == "" {
flagKey = DefaultFastPathFlagKey
}
flagCache := cache.New[string](cacheStore)
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
refresh := func() {
getCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
value, err := flagCache.Get(getCtx, flagKey)
if err != nil {
var notFound *store.NotFound
if !errors.As(err, &notFound) {
log.Errorf("Sync fast-path flag refresh: %v; disabling fast path", err)
}
flag.setEnabled(false)
return
}
flag.setEnabled(parseFastPathFlag(value))
}
refresh()
for {
select {
case <-ctx.Done():
log.Infof("Stopping Sync fast-path flag routine")
return
case <-ticker.C:
refresh()
}
}
}()
return flag
}
// parseFastPathFlag accepts "1" or "true" (any casing, surrounding whitespace
// tolerated) as enabled and treats every other value as disabled.
func parseFastPathFlag(value string) bool {
v := strings.TrimSpace(value)
if v == "1" {
return true
}
return strings.EqualFold(v, "true")
}

View File

@@ -0,0 +1,176 @@
package grpc
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
"github.com/eko/gocache/lib/v4/store"
gocache_store "github.com/eko/gocache/store/go_cache/v4"
gocache "github.com/patrickmn/go-cache"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestParseFastPathFlag(t *testing.T) {
tests := []struct {
name string
value string
want bool
}{
{"one", "1", true},
{"true lowercase", "true", true},
{"true uppercase", "TRUE", true},
{"true mixed case", "True", true},
{"true with whitespace", " true ", true},
{"zero", "0", false},
{"false", "false", false},
{"empty", "", false},
{"yes", "yes", false},
{"garbage", "garbage", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.want, parseFastPathFlag(tt.value), "parseFastPathFlag(%q)", tt.value)
})
}
}
func TestFastPathFlag_EnabledDefaultsFalse(t *testing.T) {
flag := &FastPathFlag{}
assert.False(t, flag.Enabled(), "zero value flag should report disabled")
}
func TestFastPathFlag_NilSafeEnabled(t *testing.T) {
var flag *FastPathFlag
assert.False(t, flag.Enabled(), "nil flag should report disabled without panicking")
}
func TestFastPathFlag_SetEnabled(t *testing.T) {
flag := &FastPathFlag{}
flag.setEnabled(true)
assert.True(t, flag.Enabled(), "flag should report enabled after setEnabled(true)")
flag.setEnabled(false)
assert.False(t, flag.Enabled(), "flag should report disabled after setEnabled(false)")
}
func TestNewFastPathFlag(t *testing.T) {
assert.True(t, NewFastPathFlag(true).Enabled(), "NewFastPathFlag(true) should report enabled")
assert.False(t, NewFastPathFlag(false).Enabled(), "NewFastPathFlag(false) should report disabled")
}
func TestRunFastPathFlagRoutine_NilStoreStaysDisabled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
flag := RunFastPathFlagRoutine(ctx, nil, 50*time.Millisecond, "peerSyncFastPath")
require.NotNil(t, flag, "RunFastPathFlagRoutine should always return a non-nil flag")
assert.False(t, flag.Enabled(), "flag should stay disabled when no cache store is provided")
time.Sleep(150 * time.Millisecond)
assert.False(t, flag.Enabled(), "flag should remain disabled after wait when no cache store is provided")
}
func TestRunFastPathFlagRoutine_ReadsFlagFromStore(t *testing.T) {
cacheStore := newFastPathTestStore(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
flag := RunFastPathFlagRoutine(ctx, cacheStore, 50*time.Millisecond, "peerSyncFastPath")
require.NotNil(t, flag)
assert.False(t, flag.Enabled(), "flag should start disabled when the key is missing")
require.NoError(t, cacheStore.Set(ctx, "peerSyncFastPath", "1"), "seed flag=1 into shared store")
assert.Eventually(t, flag.Enabled, 2*time.Second, 25*time.Millisecond, "flag should flip enabled after the key is set to 1")
require.NoError(t, cacheStore.Set(ctx, "peerSyncFastPath", "0"), "override flag=0 into shared store")
assert.Eventually(t, func() bool {
return !flag.Enabled()
}, 2*time.Second, 25*time.Millisecond, "flag should flip disabled after the key is set to 0")
require.NoError(t, cacheStore.Delete(ctx, "peerSyncFastPath"), "remove flag key")
assert.Eventually(t, func() bool {
return !flag.Enabled()
}, 2*time.Second, 25*time.Millisecond, "flag should stay disabled after the key is deleted")
require.NoError(t, cacheStore.Set(ctx, "peerSyncFastPath", "true"), "enable via string true")
assert.Eventually(t, flag.Enabled, 2*time.Second, 25*time.Millisecond, "flag should accept \"true\" as enabled")
}
func TestRunFastPathFlagRoutine_MissingKeyKeepsDisabled(t *testing.T) {
cacheStore := newFastPathTestStore(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
flag := RunFastPathFlagRoutine(ctx, cacheStore, 50*time.Millisecond, "peerSyncFastPathAbsent")
require.NotNil(t, flag)
time.Sleep(200 * time.Millisecond)
assert.False(t, flag.Enabled(), "flag should stay disabled when the key is missing from the store")
}
func TestRunFastPathFlagRoutine_DefaultKeyUsedWhenEmpty(t *testing.T) {
cacheStore := newFastPathTestStore(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
require.NoError(t, cacheStore.Set(ctx, DefaultFastPathFlagKey, "1"), "seed default key")
flag := RunFastPathFlagRoutine(ctx, cacheStore, 50*time.Millisecond, "")
require.NotNil(t, flag)
assert.Eventually(t, flag.Enabled, 2*time.Second, 25*time.Millisecond, "empty flagKey should fall back to DefaultFastPathFlagKey")
}
func newFastPathTestStore(t *testing.T) store.StoreInterface {
t.Helper()
return gocache_store.NewGoCache(gocache.New(5*time.Minute, 10*time.Minute))
}
func TestRunFastPathFlagRoutine_FailsClosedOnReadError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
s := &flakyStore{
StoreInterface: newFastPathTestStore(t),
}
require.NoError(t, s.Set(ctx, "peerSyncFastPath", "1"), "seed flag enabled")
flag := RunFastPathFlagRoutine(ctx, s, 50*time.Millisecond, "peerSyncFastPath")
require.NotNil(t, flag)
assert.Eventually(t, flag.Enabled, 2*time.Second, 25*time.Millisecond, "flag should flip enabled while store reads succeed")
s.setGetError(errors.New("simulated transient store failure"))
assert.Eventually(t, func() bool {
return !flag.Enabled()
}, 2*time.Second, 25*time.Millisecond, "flag should flip disabled on store read error (fail-closed)")
s.setGetError(nil)
assert.Eventually(t, flag.Enabled, 2*time.Second, 25*time.Millisecond, "flag should recover once the store read succeeds again")
}
// flakyStore wraps a real store and lets tests inject a transient Get error
// without affecting Set/Delete. Used to exercise fail-closed behaviour.
type flakyStore struct {
store.StoreInterface
getErr atomic.Pointer[error]
}
func (f *flakyStore) Get(ctx context.Context, key any) (any, error) {
if errPtr := f.getErr.Load(); errPtr != nil && *errPtr != nil {
return nil, *errPtr
}
return f.StoreInterface.Get(ctx, key)
}
func (f *flakyStore) setGetError(err error) {
if err == nil {
f.getErr.Store(nil)
return
}
f.getErr.Store(&err)
}

View File

@@ -0,0 +1,82 @@
package grpc
import (
"context"
"encoding/json"
"time"
"github.com/eko/gocache/lib/v4/cache"
"github.com/eko/gocache/lib/v4/store"
log "github.com/sirupsen/logrus"
)
const (
peerSerialCacheKeyPrefix = "peer-sync:"
// DefaultPeerSerialCacheTTL bounds how long a cached serial survives. If the
// cache write on a full-map send ever drops, entries naturally expire and
// the next Sync falls back to the full path, re-priming the cache.
DefaultPeerSerialCacheTTL = 24 * time.Hour
)
// PeerSerialCache records the NetworkMap serial and meta hash last delivered to
// each peer on Sync. Lookups are used to skip full network map computation when
// the peer already has the latest state. Backed by the shared cache store so
// entries survive management replicas sharing a Redis instance.
type PeerSerialCache struct {
cache *cache.Cache[string]
ctx context.Context
ttl time.Duration
}
// NewPeerSerialCache creates a cache wrapper bound to the shared cache store.
// The ttl is applied to every Set call; entries older than ttl are treated as
// misses so the server eventually converges to delivering a full map even if
// an earlier Set was lost.
func NewPeerSerialCache(ctx context.Context, cacheStore store.StoreInterface, ttl time.Duration) *PeerSerialCache {
return &PeerSerialCache{
cache: cache.New[string](cacheStore),
ctx: ctx,
ttl: ttl,
}
}
// Get returns the entry previously recorded for this peer and whether a valid
// entry was found. A cache miss or any read error is reported as a miss so
// callers fall back to the full map path.
func (c *PeerSerialCache) Get(pubKey string) (peerSyncEntry, bool) {
raw, err := c.cache.Get(c.ctx, peerSerialCacheKeyPrefix+pubKey)
if err != nil {
return peerSyncEntry{}, false
}
entry := peerSyncEntry{}
if err := json.Unmarshal([]byte(raw), &entry); err != nil {
log.Debugf("peer serial cache: unmarshal entry for %s: %v", pubKey, err)
return peerSyncEntry{}, false
}
return entry, true
}
// Set records what the server most recently delivered to this peer. Errors are
// logged at debug level so cache outages degrade gracefully into the full map
// path on the next Sync rather than failing the current Sync.
func (c *PeerSerialCache) Set(pubKey string, entry peerSyncEntry) {
payload, err := json.Marshal(entry)
if err != nil {
log.Debugf("peer serial cache: marshal entry for %s: %v", pubKey, err)
return
}
if err := c.cache.Set(c.ctx, peerSerialCacheKeyPrefix+pubKey, string(payload), store.WithExpiration(c.ttl)); err != nil {
log.Debugf("peer serial cache: set entry for %s: %v", pubKey, err)
}
}
// Delete removes any cached entry for this peer. Used on Login so the next
// Sync always sees a miss and delivers a full map.
func (c *PeerSerialCache) Delete(pubKey string) {
if err := c.cache.Delete(c.ctx, peerSerialCacheKeyPrefix+pubKey); err != nil {
log.Debugf("peer serial cache: delete entry for %s: %v", pubKey, err)
}
}

View File

@@ -0,0 +1,116 @@
package grpc
import "testing"
func TestShouldSkipNetworkMap(t *testing.T) {
tests := []struct {
name string
goOS string
hit bool
cached peerSyncEntry
currentSerial uint64
incomingMeta uint64
want bool
}{
{
name: "android never skips even on clean cache hit",
goOS: "android",
hit: true,
cached: peerSyncEntry{Serial: 42, MetaHash: 7},
currentSerial: 42,
incomingMeta: 7,
want: false,
},
{
name: "android uppercase never skips",
goOS: "Android",
hit: true,
cached: peerSyncEntry{Serial: 42, MetaHash: 7},
currentSerial: 42,
incomingMeta: 7,
want: false,
},
{
name: "cache miss forces full path",
goOS: "linux",
hit: false,
cached: peerSyncEntry{},
currentSerial: 42,
incomingMeta: 7,
want: false,
},
{
name: "serial mismatch forces full path",
goOS: "linux",
hit: true,
cached: peerSyncEntry{Serial: 41, MetaHash: 7},
currentSerial: 42,
incomingMeta: 7,
want: false,
},
{
name: "meta mismatch forces full path",
goOS: "linux",
hit: true,
cached: peerSyncEntry{Serial: 42, MetaHash: 7},
currentSerial: 42,
incomingMeta: 9,
want: false,
},
{
name: "clean hit on linux skips",
goOS: "linux",
hit: true,
cached: peerSyncEntry{Serial: 42, MetaHash: 7},
currentSerial: 42,
incomingMeta: 7,
want: true,
},
{
name: "clean hit on darwin skips",
goOS: "darwin",
hit: true,
cached: peerSyncEntry{Serial: 42, MetaHash: 7},
currentSerial: 42,
incomingMeta: 7,
want: true,
},
{
name: "clean hit on windows skips",
goOS: "windows",
hit: true,
cached: peerSyncEntry{Serial: 42, MetaHash: 7},
currentSerial: 42,
incomingMeta: 7,
want: true,
},
{
name: "zero current serial never skips",
goOS: "linux",
hit: true,
cached: peerSyncEntry{Serial: 0, MetaHash: 7},
currentSerial: 0,
incomingMeta: 7,
want: false,
},
{
name: "empty goos treated as non-android and skips",
goOS: "",
hit: true,
cached: peerSyncEntry{Serial: 42, MetaHash: 7},
currentSerial: 42,
incomingMeta: 7,
want: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := shouldSkipNetworkMap(tc.goOS, tc.hit, tc.cached, tc.currentSerial, tc.incomingMeta)
if got != tc.want {
t.Fatalf("shouldSkipNetworkMap(%q, hit=%v, cached=%+v, current=%d, meta=%d) = %v, want %v",
tc.goOS, tc.hit, tc.cached, tc.currentSerial, tc.incomingMeta, got, tc.want)
}
})
}
}

View File

@@ -0,0 +1,134 @@
package grpc
import (
"context"
"os"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
nbcache "github.com/netbirdio/netbird/management/server/cache"
)
func newTestPeerSerialCache(t *testing.T, ttl, cleanup time.Duration) *PeerSerialCache {
t.Helper()
s, err := nbcache.NewStore(context.Background(), ttl, cleanup, 100)
require.NoError(t, err, "cache store must initialise")
return NewPeerSerialCache(context.Background(), s, ttl)
}
func TestPeerSerialCache_GetSetDelete(t *testing.T) {
c := newTestPeerSerialCache(t, time.Minute, time.Minute)
key := "pubkey-aaa"
_, hit := c.Get(key)
assert.False(t, hit, "empty cache must miss")
c.Set(key, peerSyncEntry{Serial: 42, MetaHash: 7})
entry, hit := c.Get(key)
require.True(t, hit, "after Set, Get must hit")
assert.Equal(t, uint64(42), entry.Serial, "serial roundtrip")
assert.Equal(t, uint64(7), entry.MetaHash, "meta hash roundtrip")
c.Delete(key)
_, hit = c.Get(key)
assert.False(t, hit, "after Delete, Get must miss")
}
func TestPeerSerialCache_GetMissReturnsZero(t *testing.T) {
c := newTestPeerSerialCache(t, time.Minute, time.Minute)
entry, hit := c.Get("missing")
assert.False(t, hit, "miss must report false")
assert.Equal(t, peerSyncEntry{}, entry, "miss must return zero value")
}
func TestPeerSerialCache_TTLExpiry(t *testing.T) {
c := newTestPeerSerialCache(t, 100*time.Millisecond, 10*time.Millisecond)
key := "pubkey-ttl"
c.Set(key, peerSyncEntry{Serial: 1, MetaHash: 1})
time.Sleep(250 * time.Millisecond)
_, hit := c.Get(key)
assert.False(t, hit, "entry must expire after TTL")
}
func TestPeerSerialCache_OverwriteUpdatesValue(t *testing.T) {
c := newTestPeerSerialCache(t, time.Minute, time.Minute)
key := "pubkey-overwrite"
c.Set(key, peerSyncEntry{Serial: 1, MetaHash: 1})
c.Set(key, peerSyncEntry{Serial: 99, MetaHash: 123})
entry, hit := c.Get(key)
require.True(t, hit, "overwritten key must still be present")
assert.Equal(t, uint64(99), entry.Serial, "overwrite updates serial")
assert.Equal(t, uint64(123), entry.MetaHash, "overwrite updates meta hash")
}
func TestPeerSerialCache_IsolatedPerKey(t *testing.T) {
c := newTestPeerSerialCache(t, time.Minute, time.Minute)
c.Set("a", peerSyncEntry{Serial: 1, MetaHash: 1})
c.Set("b", peerSyncEntry{Serial: 2, MetaHash: 2})
a, hitA := c.Get("a")
b, hitB := c.Get("b")
require.True(t, hitA, "key a must hit")
require.True(t, hitB, "key b must hit")
assert.Equal(t, uint64(1), a.Serial, "key a serial")
assert.Equal(t, uint64(2), b.Serial, "key b serial")
c.Delete("a")
_, hitA = c.Get("a")
_, hitB = c.Get("b")
assert.False(t, hitA, "deleting a must not affect b")
assert.True(t, hitB, "b must remain after a delete")
}
func TestPeerSerialCache_Concurrent(t *testing.T) {
c := newTestPeerSerialCache(t, time.Minute, time.Minute)
var wg sync.WaitGroup
const workers = 50
const iterations = 20
for w := 0; w < workers; w++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := "pubkey"
for i := 0; i < iterations; i++ {
c.Set(key, peerSyncEntry{Serial: uint64(id*iterations + i), MetaHash: uint64(id)})
_, _ = c.Get(key)
}
}(w)
}
wg.Wait()
_, hit := c.Get("pubkey")
assert.True(t, hit, "cache must survive concurrent Set/Get without deadlock")
}
func TestPeerSerialCache_Redis(t *testing.T) {
if os.Getenv(nbcache.RedisStoreEnvVar) == "" {
t.Skipf("set %s to run this test against a real Redis", nbcache.RedisStoreEnvVar)
}
s, err := nbcache.NewStore(context.Background(), time.Minute, 10*time.Second, 10)
require.NoError(t, err, "redis store must initialise")
c := NewPeerSerialCache(context.Background(), s, time.Minute)
key := "redis-pubkey"
c.Set(key, peerSyncEntry{Serial: 42, MetaHash: 7})
entry, hit := c.Get(key)
require.True(t, hit, "redis hit expected")
assert.Equal(t, uint64(42), entry.Serial)
c.Delete(key)
}

View File

@@ -14,6 +14,7 @@ import (
"sync/atomic"
"time"
cachestore "github.com/eko/gocache/lib/v4/store"
pb "github.com/golang/protobuf/proto" // nolint
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
@@ -84,9 +85,35 @@ type Server struct {
reverseProxyManager rpservice.Manager
reverseProxyMu sync.RWMutex
// peerSerialCache lets Sync skip full network map computation when the peer
// already has the latest account serial. A nil cache disables the fast path.
peerSerialCache *PeerSerialCache
// fastPathFlag is the runtime kill switch for the Sync fast path. A nil
// flag or a flag reporting disabled forces every Sync through the full
// network map path.
fastPathFlag *FastPathFlag
// Secondary TTL-based caches used by the Sync fast path to skip DB reads
// for the account's ExtraSettings and a peer's group membership. Both
// are nil-safe and disabled if the shared cache store wasn't provided.
extraSettingsCache *extraSettingsCache
peerGroupsCache *peerGroupsCache
// inflightMarkPeerConnected dedupes the fire-and-forget MarkPeerConnected
// writes kicked off by the fast path. Keys are peer pubkeys; presence
// means a background goroutine is already writing for that peer, so
// concurrent fast-path Syncs for the same peer coalesce to one write.
inflightMarkPeerConnected sync.Map
}
// NewServer creates a new Management server
// NewServer creates a new Management server. peerSerialCache and fastPathFlag
// are both optional; when either is nil or the flag reports disabled, the
// Sync fast path is disabled and every request runs the full map computation,
// matching the pre-cache behaviour. cacheStore is used to back the
// secondary fast-path caches (account serial, ExtraSettings, peer groups);
// a nil store silently disables those caches without affecting correctness.
func NewServer(
config *nbconfig.Config,
accountManager account.Manager,
@@ -98,6 +125,9 @@ func NewServer(
integratedPeerValidator integrated_validator.IntegratedValidator,
networkMapController network_map.Controller,
oAuthConfigProvider idp.OAuthConfigProvider,
peerSerialCache *PeerSerialCache,
fastPathFlag *FastPathFlag,
cacheStore cachestore.StoreInterface,
) (*Server, error) {
if appMetrics != nil {
// update gauge based on number of connected peers which is equal to open gRPC streams
@@ -145,6 +175,12 @@ func NewServer(
syncLim: syncLim,
syncLimEnabled: syncLimEnabled,
peerSerialCache: peerSerialCache,
fastPathFlag: fastPathFlag,
extraSettingsCache: newExtraSettingsCache(context.Background(), cacheStore, DefaultExtraSettingsCacheTTL),
peerGroupsCache: newPeerGroupsCache(context.Background(), cacheStore, DefaultPeerGroupsCacheTTL),
}, nil
}
@@ -233,24 +269,50 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
ctx := srv.Context()
syncReq := &proto.SyncRequest{}
parseStart := time.Now()
peerKey, err := s.parseRequest(ctx, req, syncReq)
if err != nil {
s.syncSem.Add(-1)
return err
}
log.WithContext(ctx).Debugf("fast path: parseRequest took %s", time.Since(parseStart))
realIP := getRealIP(ctx)
sRealIP := realIP.String()
peerMeta := extractPeerMeta(ctx, syncReq.GetMeta())
userID, err := s.accountManager.GetUserIDByPeerKey(ctx, peerKey.String())
if err != nil {
s.syncSem.Add(-1)
if errStatus, ok := internalStatus.FromError(err); ok && errStatus.Type() == internalStatus.NotFound {
return status.Errorf(codes.PermissionDenied, "peer is not registered")
metahashed := metaHash(peerMeta, sRealIP)
// Fast path authorisation short-circuit: if the peer-sync cache has a
// complete entry whose metaHash still matches the incoming request, we can
// skip GetPeerAuthInfo entirely. The entry carries AccountID and HasUser
// so we have everything the loginFilter gate and the rest of the handler
// need. On any mismatch we fall back to the DB read below.
var (
userID string
accountID string
)
cachedEntry, cachedEntryHit := s.lookupPeerAuthFromCache(peerKey.String(), metahashed, peerMeta.GoOS)
if cachedEntryHit {
accountID = cachedEntry.AccountID
if cachedEntry.HasUser {
userID = "cached"
}
return mapError(ctx, err)
log.WithContext(ctx).Debugf("fast path: GetPeerAuthInfo skipped (cache hit)")
} else {
authInfoStart := time.Now()
uid, aid, err := s.accountManager.GetPeerAuthInfo(ctx, peerKey.String())
if err != nil {
s.syncSem.Add(-1)
if errStatus, ok := internalStatus.FromError(err); ok && errStatus.Type() == internalStatus.NotFound {
return status.Errorf(codes.PermissionDenied, "peer is not registered")
}
return mapError(ctx, err)
}
userID = uid
accountID = aid
log.WithContext(ctx).Debugf("fast path: GetPeerAuthInfo took %s", time.Since(authInfoStart))
}
metahashed := metaHash(peerMeta, sRealIP)
if userID == "" && !s.loginFilter.allowLogin(peerKey.String(), metahashed) {
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountSyncRequestBlocked()
@@ -271,19 +333,6 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.PeerIDKey, peerKey.String())
accountID, err := s.accountManager.GetAccountIDForPeerKey(ctx, peerKey.String())
if err != nil {
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.AccountIDKey, "UNKNOWN")
log.WithContext(ctx).Tracef("peer %s is not registered", peerKey.String())
if errStatus, ok := internalStatus.FromError(err); ok && errStatus.Type() == internalStatus.NotFound {
s.syncSem.Add(-1)
return status.Errorf(codes.PermissionDenied, "peer is not registered")
}
s.syncSem.Add(-1)
return err
}
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
@@ -294,7 +343,7 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
unlock()
}
}()
log.WithContext(ctx).Tracef("acquired peer lock for peer %s took %v", peerKey.String(), time.Since(start))
log.WithContext(ctx).Debugf("fast path: acquirePeerLockByUID took %s", time.Since(start))
log.WithContext(ctx).Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, sRealIP)
@@ -305,6 +354,12 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
metahash := metaHash(peerMeta, realIP.String())
s.loginFilter.addLogin(peerKey.String(), metahash)
took, skipReason, err := s.tryFastPathSync(ctx, reqStart, syncStart, accountID, peerKey, peerMeta, realIP, metahash, srv, &unlock)
if took {
return err
}
log.WithContext(ctx).Debugf("Sync (fast path) skipped reason=%s", skipReason)
peer, netMap, postureChecks, dnsFwdPort, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), peerMeta, realIP, syncStart)
if err != nil {
log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err)
@@ -319,6 +374,7 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, syncStart)
return err
}
s.recordPeerSyncEntry(peerKey.String(), netMap, metahash, peer)
updates, err := s.networkMapController.OnPeerConnected(ctx, accountID, peer.ID)
if err != nil {
@@ -340,7 +396,7 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
s.syncSem.Add(-1)
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv, syncStart)
return s.handleUpdates(ctx, accountID, peerKey, peer, metahash, updates, srv, syncStart)
}
func (s *Server) handleHandshake(ctx context.Context, srv proto.ManagementService_JobServer) (wgtypes.Key, error) {
@@ -410,8 +466,9 @@ func (s *Server) sendJobsLoop(ctx context.Context, accountID string, peerKey wgt
// handleUpdates sends updates to the connected peer until the updates channel is closed.
// It implements a backpressure mechanism that sends the first update immediately,
// then debounces subsequent rapid updates, ensuring only the latest update is sent
// after a quiet period.
func (s *Server) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *network_map.UpdateMessage, srv proto.ManagementService_SyncServer, streamStartTime time.Time) error {
// after a quiet period. peerMetaHash is forwarded to sendUpdate so the peer-sync
// cache can record the serial this peer just received.
func (s *Server) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, peerMetaHash uint64, updates chan *network_map.UpdateMessage, srv proto.ManagementService_SyncServer, streamStartTime time.Time) error {
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())
// Create a debouncer for this peer connection
@@ -436,7 +493,7 @@ func (s *Server) handleUpdates(ctx context.Context, accountID string, peerKey wg
log.WithContext(ctx).Debugf("received an update for peer %s", peerKey.String())
if debouncer.ProcessUpdate(update) {
// Send immediately (first update or after quiet period)
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv, streamStartTime); err != nil {
if err := s.sendUpdate(ctx, accountID, peerKey, peer, peerMetaHash, update, srv, streamStartTime); err != nil {
log.WithContext(ctx).Debugf("error while sending an update to peer %s: %v", peerKey.String(), err)
return err
}
@@ -450,7 +507,7 @@ func (s *Server) handleUpdates(ctx context.Context, accountID string, peerKey wg
}
log.WithContext(ctx).Debugf("sending %d debounced update(s) for peer %s", len(pendingUpdates), peerKey.String())
for _, pendingUpdate := range pendingUpdates {
if err := s.sendUpdate(ctx, accountID, peerKey, peer, pendingUpdate, srv, streamStartTime); err != nil {
if err := s.sendUpdate(ctx, accountID, peerKey, peer, peerMetaHash, pendingUpdate, srv, streamStartTime); err != nil {
log.WithContext(ctx).Debugf("error while sending an update to peer %s: %v", peerKey.String(), err)
return err
}
@@ -468,7 +525,9 @@ func (s *Server) handleUpdates(ctx context.Context, accountID string, peerKey wg
// sendUpdate encrypts the update message using the peer key and the server's wireguard key,
// then sends the encrypted message to the connected peer via the sync server.
func (s *Server) sendUpdate(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, update *network_map.UpdateMessage, srv proto.ManagementService_SyncServer, streamStartTime time.Time) error {
// For MessageTypeNetworkMap updates it records the delivered serial in the
// peer-sync cache so a subsequent Sync with the same serial can take the fast path.
func (s *Server) sendUpdate(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, peerMetaHash uint64, update *network_map.UpdateMessage, srv proto.ManagementService_SyncServer, streamStartTime time.Time) error {
key, err := s.secretsManager.GetWGKey()
if err != nil {
s.cancelPeerRoutines(ctx, accountID, peer, streamStartTime)
@@ -488,6 +547,9 @@ func (s *Server) sendUpdate(ctx context.Context, accountID string, peerKey wgtyp
s.cancelPeerRoutines(ctx, accountID, peer, streamStartTime)
return status.Errorf(codes.Internal, "failed sending update message")
}
if update.MessageType == network_map.MessageTypeNetworkMap {
s.recordPeerSyncEntryFromUpdate(peerKey.String(), update, peerMetaHash, peer)
}
log.WithContext(ctx).Debugf("sent an update to peer %s", peerKey.String())
return nil
}
@@ -772,6 +834,7 @@ func (s *Server) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto
log.WithContext(ctx).Warnf("failed logging in peer %s: %s", peerKey, err)
return nil, mapError(ctx, err)
}
s.invalidatePeerSyncEntry(peerKey.String())
loginResp, err := s.prepareLoginResponse(ctx, peer, netMap, postureChecks)
if err != nil {

View File

@@ -0,0 +1,562 @@
package grpc
import (
"context"
"net"
"strings"
"time"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
integrationsConfig "github.com/netbirdio/management-integrations/integrations/config"
"github.com/netbirdio/netbird/encryption"
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
nbconfig "github.com/netbirdio/netbird/management/internals/server/config"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/store"
nbtypes "github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/proto"
)
// peerGroupFetcher returns the group IDs a peer belongs to. It is a dependency
// of buildFastPathResponse so tests can inject a stub without a real store.
type peerGroupFetcher func(ctx context.Context, accountID, peerID string) ([]string, error)
// peerSyncEntry records what the server last delivered to a peer on Sync so we
// can decide whether the next Sync can skip the full network map computation.
// It also carries the minimum peer/auth metadata needed to run the fast path
// without a DB round-trip on cache hit.
type peerSyncEntry struct {
// Serial is the NetworkMap.Serial the server last included in a full map
// delivered to this peer.
Serial uint64
// MetaHash is the metaHash() value of the peer metadata at the time of that
// delivery, used to detect a meta change on reconnect.
MetaHash uint64
// AccountID is the peer's account ID. Cached so the Sync hot path can skip
// GetPeerAuthInfo on cache hit.
AccountID string
// PeerID is the peer's internal ID, needed for network-map subscription
// and update-channel routing.
PeerID string
// PeerKey mirrors the cache key (peer's wireguard pubkey) so the peer
// snapshot carries everything required by cancelPeerRoutines without a
// second store lookup.
PeerKey string
// Ephemeral is the peer's ephemeral flag, used by EphemeralPeersManager
// on subscribe/unsubscribe.
Ephemeral bool
// HasUser is true if the peer is user-owned (peer.UserID != ""). Used in
// place of GetUserIDByPeerKey's result to drive the loginFilter gate on
// cache hit.
HasUser bool
}
// IsComplete reports whether the entry has every field the pure-cache fast
// path needs. Entries written by older code (before step 2) will carry only
// Serial and MetaHash and must fall back to the slow path so the cache is
// repopulated with the full shape.
func (e peerSyncEntry) IsComplete() bool {
return e.AccountID != "" && e.PeerID != "" && e.PeerKey != ""
}
// PeerSnapshot reconstructs the minimum *nbpeer.Peer needed by
// OnPeerConnectedWithPeer, EphemeralPeersManager, handleUpdates,
// cancelPeerRoutines, and buildFastPathResponse.
func (e peerSyncEntry) PeerSnapshot() *nbpeer.Peer {
return &nbpeer.Peer{
ID: e.PeerID,
Key: e.PeerKey,
AccountID: e.AccountID,
Ephemeral: e.Ephemeral,
}
}
// lookupPeerAuthFromCache checks whether the peer-sync cache holds a complete
// entry for this peer with a matching metaHash, so the Sync handler can skip
// the pre-fast-path GetPeerAuthInfo store read. Returns hit=false whenever
// the fast path is disabled, the peer is Android, the cache is empty, the
// entry is from an older shape without snapshot fields, or metaHash differs.
func (s *Server) lookupPeerAuthFromCache(peerPubKey string, incomingMetaHash uint64, goOS string) (peerSyncEntry, bool) {
if s.peerSerialCache == nil {
return peerSyncEntry{}, false
}
if !s.fastPathFlag.Enabled() {
return peerSyncEntry{}, false
}
if strings.EqualFold(goOS, "android") {
return peerSyncEntry{}, false
}
entry, hit := s.peerSerialCache.Get(peerPubKey)
if !hit || !entry.IsComplete() {
return peerSyncEntry{}, false
}
if entry.MetaHash != incomingMetaHash {
return peerSyncEntry{}, false
}
return entry, true
}
// shouldSkipNetworkMap reports whether a Sync request from this peer can be
// answered with a lightweight NetbirdConfig-only response instead of a full
// map computation. All conditions must hold:
// - the peer is not Android (Android's GrpcClient.GetNetworkMap errors on nil map)
// - the cache holds an entry for this peer
// - the cached serial matches the current account serial
// - the cached meta hash matches the incoming meta hash
// - the cached serial is non-zero (guard against uninitialised entries)
//
// recordFastPathSkip bumps the slow-path sync counter with a reason label.
// Used from every early-return site in tryFastPathSync so Grafana can graph
// fast-path misses by cause. No log is emitted here — a single
// "Sync (fast path) skipped reason=<reason>" line is written by the Sync
// handler in server.go once tryFastPathSync returns so a grep for
// "Sync (fast path)" finds both the success and the skip outcomes in one
// shot.
func (s *Server) recordFastPathSkip(_ context.Context, reason string) {
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountSlowPathSync(reason)
}
}
// fastPathSkipReason returns a short reason tag when the eligibility check
// fails, or "" when the fast path should run. Mirrors shouldSkipNetworkMap's
// logic but attributes each individual failure condition so callers can log
// a histogram of fast-path misses.
func fastPathSkipReason(hit bool, cached peerSyncEntry, currentSerial, incomingMetaHash uint64) string {
if !hit {
return "cache_miss"
}
if cached.Serial == 0 {
return "cached_serial_zero"
}
if cached.Serial != currentSerial {
return "serial_mismatch"
}
if cached.MetaHash != incomingMetaHash {
return "meta_mismatch"
}
return ""
}
func shouldSkipNetworkMap(goOS string, hit bool, cached peerSyncEntry, currentSerial, incomingMetaHash uint64) bool {
if strings.EqualFold(goOS, "android") {
return false
}
if !hit {
return false
}
if cached.Serial == 0 {
return false
}
if cached.Serial != currentSerial {
return false
}
if cached.MetaHash != incomingMetaHash {
return false
}
return true
}
// extraSettingsFetcher is the dependency used by buildFastPathResponse to
// obtain ExtraSettings for the peer's account. Matches the shape of the
// method on settings.Manager but as a plain function so production callers
// can wrap it with a cache and tests can inject a stub.
type extraSettingsFetcher func(ctx context.Context, accountID string) (*nbtypes.ExtraSettings, error)
// buildFastPathResponse constructs a SyncResponse containing only NetbirdConfig
// with fresh TURN/Relay tokens, mirroring the shape used by
// TimeBasedAuthSecretsManager when pushing token refreshes. The response omits
// NetworkMap, PeerConfig, Checks and RemotePeers; the client keeps its existing
// state and only refreshes its control-plane credentials.
func buildFastPathResponse(
ctx context.Context,
cfg *nbconfig.Config,
secrets SecretsManager,
fetchExtraSettings extraSettingsFetcher,
fetchGroups peerGroupFetcher,
peer *nbpeer.Peer,
) *proto.SyncResponse {
var turnToken *Token
if cfg != nil && cfg.TURNConfig != nil && cfg.TURNConfig.TimeBasedCredentials {
if t, err := secrets.GenerateTurnToken(); err == nil {
turnToken = t
} else {
log.WithContext(ctx).Warnf("fast path: generate TURN token: %v", err)
}
}
var relayToken *Token
if cfg != nil && cfg.Relay != nil && len(cfg.Relay.Addresses) > 0 {
if t, err := secrets.GenerateRelayToken(); err == nil {
relayToken = t
} else {
log.WithContext(ctx).Warnf("fast path: generate relay token: %v", err)
}
}
var extraSettings *nbtypes.ExtraSettings
if fetchExtraSettings != nil {
if es, err := fetchExtraSettings(ctx, peer.AccountID); err != nil {
log.WithContext(ctx).Debugf("fast path: get extra settings: %v", err)
} else {
extraSettings = es
}
}
nbConfig := toNetbirdConfig(cfg, turnToken, relayToken, extraSettings)
var peerGroups []string
if fetchGroups != nil {
if ids, err := fetchGroups(ctx, peer.AccountID, peer.ID); err != nil {
log.WithContext(ctx).Debugf("fast path: get peer group ids: %v", err)
} else {
peerGroups = ids
}
}
extendStart := time.Now()
nbConfig = integrationsConfig.ExtendNetBirdConfig(peer.ID, peerGroups, nbConfig, extraSettings)
log.WithContext(ctx).Debugf("fast path: ExtendNetBirdConfig took %s", time.Since(extendStart))
return &proto.SyncResponse{NetbirdConfig: nbConfig}
}
// fetchExtraSettings returns a cached ExtraSettings when available, falling
// back to the settings manager on miss. Populates the cache on miss so
// subsequent fast-path Syncs hit it.
func (s *Server) fetchExtraSettings(ctx context.Context, accountID string) (*nbtypes.ExtraSettings, error) {
if es, ok := s.extraSettingsCache.get(accountID); ok {
log.WithContext(ctx).Debugf("fast path: GetExtraSettings skipped (cache hit)")
return es, nil
}
start := time.Now()
es, err := s.settingsManager.GetExtraSettings(ctx, accountID)
if err != nil {
return nil, err
}
log.WithContext(ctx).Debugf("fast path: GetExtraSettings took %s", time.Since(start))
s.extraSettingsCache.set(accountID, es)
return es, nil
}
// tryFastPathSync decides whether the current Sync can be answered with a
// lightweight NetbirdConfig-only response. When the fast path runs, it takes
// over the whole Sync handler (MarkPeerConnected, send, OnPeerConnected,
// SetupRefresh, handleUpdates) and the returned took value is true.
//
// When took is true the caller must return the accompanying err. When took is
// false the caller falls through to the existing slow path and should log
// "Sync (fast path) skipped reason=<skipReason>" so a single grep on
// "Sync (fast path)" finds both the fast-path successes and misses.
func (s *Server) tryFastPathSync(
ctx context.Context,
reqStart, syncStart time.Time,
accountID string,
peerKey wgtypes.Key,
peerMeta nbpeer.PeerSystemMeta,
realIP net.IP,
peerMetaHash uint64,
srv proto.ManagementService_SyncServer,
unlock *func(),
) (took bool, skipReason string, err error) {
if s.peerSerialCache == nil {
s.recordFastPathSkip(ctx, "cache_disabled")
return false, "cache_disabled", nil
}
if !s.fastPathFlag.Enabled() {
s.recordFastPathSkip(ctx, "flag_off")
return false, "flag_off", nil
}
if strings.EqualFold(peerMeta.GoOS, "android") {
s.recordFastPathSkip(ctx, "android")
return false, "android", nil
}
networkStart := time.Now()
currentSerial, err := s.accountManager.GetStore().GetAccountNetworkSerial(ctx, store.LockingStrengthNone, accountID)
if err != nil {
log.WithContext(ctx).Debugf("fast path: account network serial lookup error: %v", err)
s.recordFastPathSkip(ctx, "account_network_error")
return false, "account_network_error", nil
}
log.WithContext(ctx).Debugf("fast path: initial GetAccountNetworkSerial took %s", time.Since(networkStart))
eligibilityStart := time.Now()
cached, hit := s.peerSerialCache.Get(peerKey.String())
if reason := fastPathSkipReason(hit, cached, currentSerial, peerMetaHash); reason != "" {
log.WithContext(ctx).Debugf("fast path: eligibility check took %s", time.Since(eligibilityStart))
s.recordFastPathSkip(ctx, reason)
return false, reason, nil
}
log.WithContext(ctx).Debugf("fast path: eligibility check (hit) took %s", time.Since(eligibilityStart))
var cachedPeer *nbpeer.Peer
if cached.IsComplete() {
cachedPeer = cached.PeerSnapshot()
}
peer, updates, committed := s.commitFastPath(ctx, accountID, peerKey, realIP, syncStart, cachedPeer)
if !committed {
s.recordFastPathSkip(ctx, "commit_failed")
return false, "commit_failed", nil
}
// Upgrade the cache only when we had to fetch the peer from the store
// this Sync. In the steady state the cached snapshot lacks UserID (not
// part of PeerSnapshot), so rewriting from it would flip HasUser to
// false and corrupt the entry. A cache-served peer also means the
// entry is already in the full shape, so there's nothing to upgrade.
upgradeCache := cachedPeer == nil
return true, "", s.runFastPathSync(ctx, reqStart, syncStart, accountID, peerKey, peer, updates, cached.Serial, peerMetaHash, upgradeCache, srv, unlock)
}
// commitFastPath subscribes the peer to network-map updates and marks it
// connected. When cachedPeer is non-nil (cache hit with a complete entry),
// the expensive GetPeerByPeerPubKey store call is skipped and the cached
// snapshot is used instead.
//
// It relies on the same eventual-consistency guarantee as the slow path: a
// concurrent writer's broadcast may race the subscription, but any subsequent
// serial change reaches the subscribed peer via its update channel, and a
// reconnect with a stale cached serial falls through to the slow path on the
// next Sync. Returns committed=false on any failure that should not block
// the slow path from running.
func (s *Server) commitFastPath(
ctx context.Context,
accountID string,
peerKey wgtypes.Key,
realIP net.IP,
syncStart time.Time,
cachedPeer *nbpeer.Peer,
) (*nbpeer.Peer, chan *network_map.UpdateMessage, bool) {
commitStart := time.Now()
defer func() {
log.WithContext(ctx).Debugf("fast path: commitFastPath took %s", time.Since(commitStart))
}()
var peer *nbpeer.Peer
if cachedPeer != nil {
peer = cachedPeer
log.WithContext(ctx).Debugf("fast path: GetPeerByPeerPubKey skipped (cache hit)")
} else {
getPeerStart := time.Now()
p, err := s.accountManager.GetStore().GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerKey.String())
if err != nil {
log.WithContext(ctx).Debugf("fast path: lookup peer %s: %v", peerKey.String(), err)
return nil, nil, false
}
log.WithContext(ctx).Debugf("fast path: GetPeerByPeerPubKey took %s", time.Since(getPeerStart))
peer = p
}
onConnectedStart := time.Now()
updates, err := s.networkMapController.OnPeerConnectedWithPeer(ctx, accountID, peer)
if err != nil {
log.WithContext(ctx).Debugf("fast path: notify peer connected for %s: %v", peerKey.String(), err)
return nil, nil, false
}
log.WithContext(ctx).Debugf("fast path: OnPeerConnectedWithPeer took %s", time.Since(onConnectedStart))
s.markPeerConnectedAsync(peerKey.String(), realIP, accountID, syncStart)
return peer, updates, true
}
// markPeerConnectedAsync fires MarkPeerConnected in a detached goroutine so
// the Sync hot path does not wait on a DB write that can spike into the
// multi-second range under contention. LastSeen becomes eventually-consistent
// by at most one write; the peer's next Sync or the per-peer expiration
// routines correct any drift. Concurrent fast-path Syncs for the same peer
// coalesce to a single background write via the inflight map.
func (s *Server) markPeerConnectedAsync(peerKey string, realIP net.IP, accountID string, syncStart time.Time) {
if _, loaded := s.inflightMarkPeerConnected.LoadOrStore(peerKey, struct{}{}); loaded {
log.Debugf("fast path: async MarkPeerConnected for %s coalesced (already in flight)", peerKey)
return
}
go func() {
defer s.inflightMarkPeerConnected.Delete(peerKey)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
start := time.Now()
if err := s.accountManager.MarkPeerConnected(ctx, peerKey, true, realIP, accountID, syncStart); err != nil {
log.Warnf("fast path: async MarkPeerConnected for %s: %v", peerKey, err)
return
}
log.Debugf("fast path: async MarkPeerConnected for %s took %s", peerKey, time.Since(start))
}()
}
// runFastPathSync executes the fast path: send the lean response, kick off
// token refresh, release the per-peer lock, then block on handleUpdates until
// the stream is closed. Peer lookup and subscription have already been
// performed by commitFastPath so the race between eligibility check and
// subscription is already closed.
func (s *Server) runFastPathSync(
ctx context.Context,
reqStart, syncStart time.Time,
accountID string,
peerKey wgtypes.Key,
peer *nbpeer.Peer,
updates chan *network_map.UpdateMessage,
serial uint64,
peerMetaHash uint64,
upgradeCache bool,
srv proto.ManagementService_SyncServer,
unlock *func(),
) error {
sendStart := time.Now()
if err := s.sendFastPathResponse(ctx, peerKey, peer, srv); err != nil {
log.WithContext(ctx).Debugf("fast path: send response for peer %s: %v", peerKey.String(), err)
s.syncSem.Add(-1)
s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, syncStart)
return err
}
log.WithContext(ctx).Debugf("fast path: sendFastPathResponse took %s", time.Since(sendStart))
// Upgrade a legacy-shape cache entry (Serial + MetaHash only, pre step 2)
// to the full shape so the next Sync's lookupPeerAuthFromCache +
// commitFastPath can actually short-circuit the pre-fast-path
// GetPeerAuthInfo and GetPeerByPeerPubKey. Only runs when the peer was
// freshly fetched from the store this Sync — rewriting from a cached
// snapshot would lose HasUser because PeerSnapshot doesn't carry UserID.
if upgradeCache {
s.writePeerSyncEntry(peerKey.String(), serial, peerMetaHash, peer)
}
s.secretsManager.SetupRefresh(ctx, accountID, peer.ID)
if unlock != nil && *unlock != nil {
(*unlock)()
*unlock = nil
}
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountSyncRequestDuration(time.Since(reqStart), accountID)
s.appMetrics.GRPCMetrics().CountFastPathSync()
}
log.WithContext(ctx).Debugf("Sync (fast path) took %s", time.Since(reqStart))
s.syncSem.Add(-1)
return s.handleUpdates(ctx, accountID, peerKey, peer, peerMetaHash, updates, srv, syncStart)
}
// sendFastPathResponse builds a NetbirdConfig-only SyncResponse, encrypts it
// with the peer's WireGuard key and pushes it over the stream.
func (s *Server) sendFastPathResponse(ctx context.Context, peerKey wgtypes.Key, peer *nbpeer.Peer, srv proto.ManagementService_SyncServer) error {
resp := buildFastPathResponse(ctx, s.config, s.secretsManager, s.fetchExtraSettings, s.fetchPeerGroups, peer)
key, err := s.secretsManager.GetWGKey()
if err != nil {
return status.Errorf(codes.Internal, "failed getting server key")
}
body, err := encryption.EncryptMessage(peerKey, key, resp)
if err != nil {
return status.Errorf(codes.Internal, "error encrypting fast-path sync response")
}
if err := srv.Send(&proto.EncryptedMessage{
WgPubKey: key.PublicKey().String(),
Body: body,
}); err != nil {
log.WithContext(ctx).Errorf("failed sending fast-path sync response: %v", err)
return status.Errorf(codes.Internal, "error handling request")
}
return nil
}
// fetchPeerGroups returns a cached list of group IDs for the peer when
// available, falling back to the account manager's store on miss. Populates
// the cache on miss so subsequent fast-path Syncs hit it.
func (s *Server) fetchPeerGroups(ctx context.Context, accountID, peerID string) ([]string, error) {
if ids, ok := s.peerGroupsCache.get(peerID); ok {
log.WithContext(ctx).Debugf("fast path: GetPeerGroupIDs skipped (cache hit)")
return ids, nil
}
start := time.Now()
ids, err := s.accountManager.GetStore().GetPeerGroupIDs(ctx, store.LockingStrengthNone, accountID, peerID)
if err != nil {
return nil, err
}
log.WithContext(ctx).Debugf("fast path: GetPeerGroupIDs took %s", time.Since(start))
s.peerGroupsCache.set(peerID, ids)
return ids, nil
}
// recordPeerSyncEntry writes the serial just delivered to this peer so a
// subsequent reconnect can take the fast path. Called after the slow path's
// sendInitialSync has pushed a full map. A nil cache disables the fast path.
// peer is required so the cached entry carries the snapshot fields the
// pure-cache fast path needs (AccountID, PeerID, Key, Ephemeral, HasUser).
func (s *Server) recordPeerSyncEntry(peerKey string, netMap *nbtypes.NetworkMap, peerMetaHash uint64, peer *nbpeer.Peer) {
if netMap == nil || netMap.Network == nil {
return
}
s.writePeerSyncEntry(peerKey, netMap.Network.CurrentSerial(), peerMetaHash, peer)
}
// recordPeerSyncEntryFromUpdate is the sendUpdate equivalent of
// recordPeerSyncEntry: it extracts the serial from a streamed NetworkMap update
// so the cache stays in sync with what the peer most recently received.
func (s *Server) recordPeerSyncEntryFromUpdate(peerKey string, update *network_map.UpdateMessage, peerMetaHash uint64, peer *nbpeer.Peer) {
if update == nil || update.Update == nil || update.Update.NetworkMap == nil {
return
}
s.writePeerSyncEntry(peerKey, update.Update.NetworkMap.GetSerial(), peerMetaHash, peer)
}
// writePeerSyncEntry is the common cache write used by every path that
// delivers state to a peer: the slow-path sendInitialSync, streamed
// NetworkMap updates, and the fast path itself. Writing from the fast path
// upgrades legacy-shape entries (Serial + MetaHash only, pre step 2) to the
// full shape on the next successful Sync so subsequent cache hits can
// actually short-circuit GetPeerAuthInfo and GetPeerByPeerPubKey.
func (s *Server) writePeerSyncEntry(peerKey string, serial, peerMetaHash uint64, peer *nbpeer.Peer) {
if s.peerSerialCache == nil {
return
}
if !s.fastPathFlag.Enabled() {
return
}
if serial == 0 {
return
}
s.peerSerialCache.Set(peerKey, newPeerSyncEntry(serial, peerMetaHash, peer))
}
// newPeerSyncEntry builds a cache entry with every field the pure-cache
// fast path needs. peer may be nil (very old call sites), in which case the
// entry is written without the snapshot fields and will fail IsComplete().
func newPeerSyncEntry(serial, metaHash uint64, peer *nbpeer.Peer) peerSyncEntry {
entry := peerSyncEntry{
Serial: serial,
MetaHash: metaHash,
}
if peer != nil {
entry.AccountID = peer.AccountID
entry.PeerID = peer.ID
entry.PeerKey = peer.Key
entry.Ephemeral = peer.Ephemeral
entry.HasUser = peer.UserID != ""
}
return entry
}
// invalidatePeerSyncEntry is called after a successful Login so the next Sync
// is guaranteed to deliver a full map, picking up whatever changed in the
// login (SSH key rotation, approval state, user binding, etc.).
func (s *Server) invalidatePeerSyncEntry(peerKey string) {
if s.peerSerialCache == nil {
return
}
s.peerSerialCache.Delete(peerKey)
}

View File

@@ -0,0 +1,163 @@
package grpc
import (
"context"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
"github.com/netbirdio/netbird/management/internals/server/config"
"github.com/netbirdio/netbird/management/server/groups"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/settings"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/util"
)
func fastPathTestPeer() *nbpeer.Peer {
return &nbpeer.Peer{
ID: "peer-id",
AccountID: "account-id",
Key: "pubkey",
}
}
func fastPathTestSecrets(t *testing.T, turn *config.TURNConfig, relay *config.Relay) *TimeBasedAuthSecretsManager {
t.Helper()
peersManager := update_channel.NewPeersUpdateManager(nil)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMock := settings.NewMockManager(ctrl)
secrets, err := NewTimeBasedAuthSecretsManager(peersManager, turn, relay, settingsMock, groups.NewManagerMock())
require.NoError(t, err, "secrets manager initialisation must succeed")
return secrets
}
func noGroupsFetcher(context.Context, string, string) ([]string, error) {
return nil, nil
}
func TestBuildFastPathResponse_TimeBasedTURNAndRelay_FreshTokens(t *testing.T) {
ttl := util.Duration{Duration: time.Hour}
turnCfg := &config.TURNConfig{
CredentialsTTL: ttl,
Secret: "turn-secret",
Turns: []*config.Host{TurnTestHost},
TimeBasedCredentials: true,
}
relayCfg := &config.Relay{
Addresses: []string{"rel.example:443"},
CredentialsTTL: ttl,
Secret: "relay-secret",
}
cfg := &config.Config{
TURNConfig: turnCfg,
Relay: relayCfg,
Signal: &config.Host{URI: "signal.example:443", Proto: config.HTTPS},
Stuns: []*config.Host{{URI: "stun.example:3478", Proto: config.UDP}},
}
secrets := fastPathTestSecrets(t, turnCfg, relayCfg)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMock := settings.NewMockManager(ctrl)
settingsMock.EXPECT().GetExtraSettings(gomock.Any(), "account-id").Return(&types.ExtraSettings{}, nil).AnyTimes()
resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock.GetExtraSettings, noGroupsFetcher, fastPathTestPeer())
require.NotNil(t, resp, "response must not be nil")
assert.Nil(t, resp.NetworkMap, "fast path must omit NetworkMap")
assert.Nil(t, resp.PeerConfig, "fast path must omit PeerConfig")
assert.Empty(t, resp.Checks, "fast path must omit posture checks")
assert.Empty(t, resp.RemotePeers, "fast path must omit remote peers")
require.NotNil(t, resp.NetbirdConfig, "NetbirdConfig must be present on fast path")
require.Len(t, resp.NetbirdConfig.Turns, 1, "time-based TURN credentials must be present")
assert.NotEmpty(t, resp.NetbirdConfig.Turns[0].User, "TURN user must be populated")
assert.NotEmpty(t, resp.NetbirdConfig.Turns[0].Password, "TURN password must be populated")
require.NotNil(t, resp.NetbirdConfig.Relay, "Relay config must be present when configured")
assert.NotEmpty(t, resp.NetbirdConfig.Relay.TokenPayload, "relay token payload must be populated")
assert.NotEmpty(t, resp.NetbirdConfig.Relay.TokenSignature, "relay token signature must be populated")
assert.Equal(t, []string{"rel.example:443"}, resp.NetbirdConfig.Relay.Urls, "relay URLs passthrough")
require.NotNil(t, resp.NetbirdConfig.Signal, "Signal config must be present when configured")
assert.Equal(t, "signal.example:443", resp.NetbirdConfig.Signal.Uri, "signal URI passthrough")
require.Len(t, resp.NetbirdConfig.Stuns, 1, "STUNs must be passed through")
assert.Equal(t, "stun.example:3478", resp.NetbirdConfig.Stuns[0].Uri, "STUN URI passthrough")
}
func TestBuildFastPathResponse_StaticTURNCredentials(t *testing.T) {
ttl := util.Duration{Duration: time.Hour}
staticHost := &config.Host{
URI: "turn:static.example:3478",
Proto: config.UDP,
Username: "preset-user",
Password: "preset-pass",
}
turnCfg := &config.TURNConfig{
CredentialsTTL: ttl,
Secret: "turn-secret",
Turns: []*config.Host{staticHost},
TimeBasedCredentials: false,
}
cfg := &config.Config{TURNConfig: turnCfg}
// Use a relay-free secrets manager; static TURN path does not consult it.
secrets := fastPathTestSecrets(t, turnCfg, nil)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMock := settings.NewMockManager(ctrl)
settingsMock.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes()
resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock.GetExtraSettings, noGroupsFetcher, fastPathTestPeer())
require.NotNil(t, resp.NetbirdConfig)
require.Len(t, resp.NetbirdConfig.Turns, 1, "static TURN must appear in response")
assert.Equal(t, "preset-user", resp.NetbirdConfig.Turns[0].User, "static user passthrough")
assert.Equal(t, "preset-pass", resp.NetbirdConfig.Turns[0].Password, "static password passthrough")
assert.Nil(t, resp.NetbirdConfig.Relay, "no Relay when Relay config is nil")
}
func TestBuildFastPathResponse_NoRelayConfigured_NoRelaySection(t *testing.T) {
cfg := &config.Config{}
secrets := fastPathTestSecrets(t, nil, nil)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMock := settings.NewMockManager(ctrl)
settingsMock.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes()
resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock.GetExtraSettings, noGroupsFetcher, fastPathTestPeer())
require.NotNil(t, resp.NetbirdConfig, "NetbirdConfig must be non-nil even without relay/turn")
assert.Nil(t, resp.NetbirdConfig.Relay, "Relay must be absent when not configured")
assert.Empty(t, resp.NetbirdConfig.Turns, "Turns must be empty when not configured")
}
func TestBuildFastPathResponse_ExtraSettingsErrorStillReturnsResponse(t *testing.T) {
cfg := &config.Config{}
secrets := fastPathTestSecrets(t, nil, nil)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMock := settings.NewMockManager(ctrl)
settingsMock.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(nil, assertAnError).AnyTimes()
resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock.GetExtraSettings, noGroupsFetcher, fastPathTestPeer())
require.NotNil(t, resp, "extra settings failure must degrade gracefully into an empty fast-path response")
assert.Nil(t, resp.NetworkMap, "NetworkMap still omitted on degraded path")
}
// assertAnError is a sentinel used by fast-path tests that need to simulate a
// dependency failure without caring about the error value.
var assertAnError = errForTests("simulated")
type errForTests string
func (e errForTests) Error() string { return string(e) }

View File

@@ -30,6 +30,7 @@ import (
"github.com/netbirdio/netbird/formatter/hook"
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
nbconfig "github.com/netbirdio/netbird/management/internals/server/config"
"github.com/netbirdio/netbird/management/internals/shared/fastpathcache"
"github.com/netbirdio/netbird/management/server/account"
"github.com/netbirdio/netbird/management/server/activity"
nbcache "github.com/netbirdio/netbird/management/server/cache"
@@ -111,6 +112,11 @@ type DefaultAccountManager struct {
permissionsManager permissions.Manager
disableDefaultPolicy bool
// sharedCacheStore is retained so mutation paths can invalidate the
// Sync fast-path caches (ExtraSettings, peer-groups) without a circular
// dependency on the gRPC server package that owns the read-side wrappers.
sharedCacheStore cacheStore.StoreInterface
}
var _ account.Manager = (*DefaultAccountManager)(nil)
@@ -250,6 +256,7 @@ func BuildManager(
am.externalCacheManager = nbcache.NewUserDataCache(sharedCacheStore)
am.cacheManager = nbcache.NewAccountUserDataCache(am.loadAccount, sharedCacheStore)
am.sharedCacheStore = sharedCacheStore
if !isNil(am.idpManager) && !IsEmbeddedIdp(am.idpManager) {
go func() {
@@ -368,6 +375,9 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco
if err != nil {
return nil, err
}
if extraSettingsChanged {
fastpathcache.InvalidateExtraSettings(ctx, am.sharedCacheStore, accountID)
}
am.handleRoutingPeerDNSResolutionSettings(ctx, oldSettings, newSettings, userID, accountID)
am.handleLazyConnectionSettings(ctx, oldSettings, newSettings, userID, accountID)
@@ -2287,3 +2297,9 @@ func (am *DefaultAccountManager) savePeerIPUpdate(ctx context.Context, transacti
func (am *DefaultAccountManager) GetUserIDByPeerKey(ctx context.Context, peerKey string) (string, error) {
return am.Store.GetUserIDByPeerKey(ctx, store.LockingStrengthNone, peerKey)
}
// GetPeerAuthInfo returns the userID and accountID for a peer in a single
// store call. Used by the Sync hot path to collapse two lookups into one.
func (am *DefaultAccountManager) GetPeerAuthInfo(ctx context.Context, peerKey string) (string, string, error) {
return am.Store.GetPeerAuthInfoByPubKey(ctx, store.LockingStrengthNone, peerKey)
}

View File

@@ -134,6 +134,9 @@ type Manager interface {
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
GetCurrentUserInfo(ctx context.Context, userAuth auth.UserAuth) (*users.UserInfoWithPermissions, error)
GetUserIDByPeerKey(ctx context.Context, peerKey string) (string, error)
// GetPeerAuthInfo returns the userID and accountID for a peer in a single
// store call. Used by the Sync hot path to collapse two lookups into one.
GetPeerAuthInfo(ctx context.Context, peerKey string) (userID, accountID string, err error)
GetIdentityProvider(ctx context.Context, accountID, idpID, userID string) (*types.IdentityProvider, error)
GetIdentityProviders(ctx context.Context, accountID, userID string) ([]*types.IdentityProvider, error)
CreateIdentityProvider(ctx context.Context, accountID, userID string, idp *types.IdentityProvider) (*types.IdentityProvider, error)

View File

@@ -900,6 +900,22 @@ func (mr *MockManagerMockRecorder) GetPeer(ctx, accountID, peerID, userID interf
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeer", reflect.TypeOf((*MockManager)(nil).GetPeer), ctx, accountID, peerID, userID)
}
// GetPeerAuthInfo mocks base method.
func (m *MockManager) GetPeerAuthInfo(ctx context.Context, peerKey string) (string, string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetPeerAuthInfo", ctx, peerKey)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(string)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// GetPeerAuthInfo indicates an expected call of GetPeerAuthInfo.
func (mr *MockManagerMockRecorder) GetPeerAuthInfo(ctx, peerKey interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeerAuthInfo", reflect.TypeOf((*MockManager)(nil).GetPeerAuthInfo), ctx, peerKey)
}
// GetPeerGroups mocks base method.
func (m *MockManager) GetPeerGroups(ctx context.Context, accountID, peerID string) ([]*types.Group, error) {
m.ctrl.T.Helper()

View File

@@ -391,7 +391,9 @@ func startManagementForTest(t *testing.T, testFile string, config *config.Config
return nil, nil, "", cleanup, err
}
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, MockIntegratedValidator{}, networkMapController, nil)
peerSerialCache := nbgrpc.NewPeerSerialCache(ctx, cacheStore, time.Minute)
fastPathFlag := nbgrpc.NewFastPathFlag(true)
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, MockIntegratedValidator{}, networkMapController, nil, peerSerialCache, fastPathFlag, cacheStore)
if err != nil {
return nil, nil, "", cleanup, err
}

View File

@@ -256,6 +256,9 @@ func startServer(
server.MockIntegratedValidator{},
networkMapController,
nil,
nil,
nil,
nil,
)
if err != nil {
t.Fatalf("failed creating management server: %v", err)

View File

@@ -1084,6 +1084,20 @@ func (am *MockAccountManager) GetUserIDByPeerKey(ctx context.Context, peerKey st
return "something", nil
}
// GetPeerAuthInfo mocks GetPeerAuthInfo of the AccountManager interface by
// delegating to GetUserIDByPeerKey and GetAccountIDForPeerKey.
func (am *MockAccountManager) GetPeerAuthInfo(ctx context.Context, peerKey string) (string, string, error) {
userID, err := am.GetUserIDByPeerKey(ctx, peerKey)
if err != nil {
return "", "", err
}
accountID, err := am.GetAccountIDForPeerKey(ctx, peerKey)
if err != nil {
return "", "", err
}
return userID, accountID, nil
}
// GetIdentityProvider mocks GetIdentityProvider of the AccountManager interface
func (am *MockAccountManager) GetIdentityProvider(ctx context.Context, accountID, idpID, userID string) (*types.IdentityProvider, error) {
if am.GetIdentityProviderFunc != nil {

View File

@@ -119,6 +119,13 @@ func NewSqlStore(ctx context.Context, db *gorm.DB, storeEngine types.Engine, met
log.WithContext(ctx).Infof("Set max open db connections to %d, max idle to %d, max lifetime to %v, max idle time to %v",
conns, conns, time.Hour, 3*time.Minute)
// Periodically log pool stats so we can see saturation in the regular
// management.log alongside the fast-path timing lines. Disable by setting
// NB_SQL_POOL_STATS_INTERVAL=0; default 30s.
if interval := poolStatsInterval(); interval > 0 {
go logSqlPoolStats(ctx, sql, string(storeEngine), interval)
}
if skipMigration {
log.WithContext(ctx).Infof("skipping migration")
return &SqlStore{db: db, storeEngine: storeEngine, metrics: metrics, installationPK: 1, transactionTimeout: transactionTimeout}, nil
@@ -2658,6 +2665,30 @@ func (s *SqlStore) GetAccountNetwork(ctx context.Context, lockStrength LockingSt
return accountNetwork.Network, nil
}
// GetAccountNetworkSerial returns only the network.serial column for an
// account, avoiding the overhead of materialising the full Network struct
// (which carries a JSON-serialised CIDR and other columns). Used by the Sync
// fast path to check whether the peer's cached serial still matches without
// paying the full-row read cost on contended DBs.
func (s *SqlStore) GetAccountNetworkSerial(ctx context.Context, lockStrength LockingStrength, accountID string) (uint64, error) {
tx := s.db
if lockStrength != LockingStrengthNone {
tx = tx.Clauses(clause.Locking{Strength: string(lockStrength)})
}
var serial uint64
if err := tx.Model(&types.Account{}).
Select("network_serial").
Where(idQueryCondition, accountID).
Take(&serial).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return 0, status.NewAccountNotFoundError(accountID)
}
return 0, status.Errorf(status.Internal, "issue getting network serial from store: %s", err)
}
return serial, nil
}
func (s *SqlStore) GetPeerByPeerPubKey(ctx context.Context, lockStrength LockingStrength, peerKey string) (*nbpeer.Peer, error) {
tx := s.db
if lockStrength != LockingStrengthNone {
@@ -4685,6 +4716,33 @@ func (s *SqlStore) GetUserIDByPeerKey(ctx context.Context, lockStrength LockingS
return userID, nil
}
// GetPeerAuthInfoByPubKey returns the user_id and account_id for a peer in a
// single SELECT. Used by the Sync hot path to replace the back-to-back
// GetUserIDByPeerKey + GetAccountIDByPeerPubKey calls.
func (s *SqlStore) GetPeerAuthInfoByPubKey(ctx context.Context, lockStrength LockingStrength, peerKey string) (string, string, error) {
tx := s.db
if lockStrength != LockingStrengthNone {
tx = tx.Clauses(clause.Locking{Strength: string(lockStrength)})
}
var row struct {
UserID string
AccountID string
}
result := tx.Model(&nbpeer.Peer{}).
Select("user_id", "account_id").
Take(&row, GetKeyQueryCondition(s), peerKey)
if result.Error != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return "", "", status.Errorf(status.NotFound, "peer not found: index lookup failed")
}
return "", "", status.Errorf(status.Internal, "failed to get peer auth info by peer key")
}
return row.UserID, row.AccountID, nil
}
func (s *SqlStore) CreateZone(ctx context.Context, zone *zones.Zone) error {
result := s.db.Create(zone)
if result.Error != nil {
@@ -5695,3 +5753,59 @@ func (s *SqlStore) GetRoutingPeerNetworks(_ context.Context, accountID, peerID s
return names, nil
}
// poolStatsInterval reads the polling cadence from NB_SQL_POOL_STATS_INTERVAL
// (Go duration string, e.g. "30s"). Returns 0 to disable the loop. Default
// is 30s — small enough to catch a saturation event in a Grafana panel
// scraping logs, large enough to avoid log spam.
func poolStatsInterval() time.Duration {
v := os.Getenv("NB_SQL_POOL_STATS_INTERVAL")
if v == "" {
return 30 * time.Second
}
d, err := time.ParseDuration(v)
if err != nil {
return 30 * time.Second
}
return d
}
// logSqlPoolStats periodically logs database/sql pool stats so saturation is
// visible in management.log. Logs the deltas of WaitCount and WaitDuration
// (both are cumulative counters in db.Stats) so each line is interpretable
// without subtracting against the previous one.
//
// Format mirrors the existing fast-path timing lines:
//
// db pool: engine=postgres open=10 in_use=8 idle=2 wait_delta=42 wait_dur_delta=1.2s max=10
//
// At INFO level when waits accumulate (real saturation signal), DEBUG when
// the pool is healthy.
func logSqlPoolStats(ctx context.Context, db *sql.DB, engine string, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
var lastWaitCount int64
var lastWaitDuration time.Duration
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s := db.Stats()
waitDelta := s.WaitCount - lastWaitCount
durDelta := s.WaitDuration - lastWaitDuration
lastWaitCount = s.WaitCount
lastWaitDuration = s.WaitDuration
line := fmt.Sprintf("db pool: engine=%s open=%d in_use=%d idle=%d wait_delta=%d wait_dur_delta=%s max=%d",
engine, s.OpenConnections, s.InUse, s.Idle, waitDelta, durDelta, s.MaxOpenConnections)
if waitDelta > 0 {
log.Warnf("%s", line)
} else {
log.Debugf("%s", line)
}
}
}
}

View File

@@ -188,6 +188,10 @@ type Store interface {
GetTakenIPs(ctx context.Context, lockStrength LockingStrength, accountId string) ([]net.IP, error)
IncrementNetworkSerial(ctx context.Context, accountId string) error
GetAccountNetwork(ctx context.Context, lockStrength LockingStrength, accountId string) (*types.Network, error)
// GetAccountNetworkSerial returns only the network.serial column; used by
// the Sync fast path to skip the full-row GetAccountNetwork read when all
// we need is the serial.
GetAccountNetworkSerial(ctx context.Context, lockStrength LockingStrength, accountId string) (uint64, error)
GetInstallationID() string
SaveInstallationID(ctx context.Context, ID string) error
@@ -230,6 +234,10 @@ type Store interface {
// SetFieldEncrypt sets the field encryptor for encrypting sensitive user data.
SetFieldEncrypt(enc *crypt.FieldEncrypt)
GetUserIDByPeerKey(ctx context.Context, lockStrength LockingStrength, peerKey string) (string, error)
// GetPeerAuthInfoByPubKey returns the userID and accountID for a peer in a
// single query, replacing the pattern of calling GetUserIDByPeerKey and
// GetAccountIDByPeerPubKey back-to-back on the Sync hot path.
GetPeerAuthInfoByPubKey(ctx context.Context, lockStrength LockingStrength, peerKey string) (userID, accountID string, err error)
CreateZone(ctx context.Context, zone *zones.Zone) error
UpdateZone(ctx context.Context, zone *zones.Zone) error

View File

@@ -165,19 +165,6 @@ func (mr *MockStoreMockRecorder) CleanupStaleProxies(ctx, inactivityDuration int
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanupStaleProxies", reflect.TypeOf((*MockStore)(nil).CleanupStaleProxies), ctx, inactivityDuration)
}
// GetClusterSupportsCrowdSec mocks base method.
func (m *MockStore) GetClusterSupportsCrowdSec(ctx context.Context, clusterAddr string) *bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetClusterSupportsCrowdSec", ctx, clusterAddr)
ret0, _ := ret[0].(*bool)
return ret0
}
// GetClusterSupportsCrowdSec indicates an expected call of GetClusterSupportsCrowdSec.
func (mr *MockStoreMockRecorder) GetClusterSupportsCrowdSec(ctx, clusterAddr interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetClusterSupportsCrowdSec", reflect.TypeOf((*MockStore)(nil).GetClusterSupportsCrowdSec), ctx, clusterAddr)
}
// Close mocks base method.
func (m *MockStore) Close(ctx context.Context) error {
m.ctrl.T.Helper()
@@ -1045,6 +1032,21 @@ func (mr *MockStoreMockRecorder) GetAccountNetwork(ctx, lockStrength, accountId
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAccountNetwork", reflect.TypeOf((*MockStore)(nil).GetAccountNetwork), ctx, lockStrength, accountId)
}
// GetAccountNetworkSerial mocks base method.
func (m *MockStore) GetAccountNetworkSerial(ctx context.Context, lockStrength LockingStrength, accountId string) (uint64, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetAccountNetworkSerial", ctx, lockStrength, accountId)
ret0, _ := ret[0].(uint64)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetAccountNetworkSerial indicates an expected call of GetAccountNetworkSerial.
func (mr *MockStoreMockRecorder) GetAccountNetworkSerial(ctx, lockStrength, accountId interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAccountNetworkSerial", reflect.TypeOf((*MockStore)(nil).GetAccountNetworkSerial), ctx, lockStrength, accountId)
}
// GetAccountNetworks mocks base method.
func (m *MockStore) GetAccountNetworks(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*types1.Network, error) {
m.ctrl.T.Helper()
@@ -1388,6 +1390,20 @@ func (mr *MockStoreMockRecorder) GetClusterRequireSubdomain(ctx, clusterAddr int
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetClusterRequireSubdomain", reflect.TypeOf((*MockStore)(nil).GetClusterRequireSubdomain), ctx, clusterAddr)
}
// GetClusterSupportsCrowdSec mocks base method.
func (m *MockStore) GetClusterSupportsCrowdSec(ctx context.Context, clusterAddr string) *bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetClusterSupportsCrowdSec", ctx, clusterAddr)
ret0, _ := ret[0].(*bool)
return ret0
}
// GetClusterSupportsCrowdSec indicates an expected call of GetClusterSupportsCrowdSec.
func (mr *MockStoreMockRecorder) GetClusterSupportsCrowdSec(ctx, clusterAddr interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetClusterSupportsCrowdSec", reflect.TypeOf((*MockStore)(nil).GetClusterSupportsCrowdSec), ctx, clusterAddr)
}
// GetClusterSupportsCustomPorts mocks base method.
func (m *MockStore) GetClusterSupportsCustomPorts(ctx context.Context, clusterAddr string) *bool {
m.ctrl.T.Helper()
@@ -1687,6 +1703,22 @@ func (mr *MockStoreMockRecorder) GetPATByID(ctx, lockStrength, userID, patID int
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPATByID", reflect.TypeOf((*MockStore)(nil).GetPATByID), ctx, lockStrength, userID, patID)
}
// GetPeerAuthInfoByPubKey mocks base method.
func (m *MockStore) GetPeerAuthInfoByPubKey(ctx context.Context, lockStrength LockingStrength, peerKey string) (string, string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetPeerAuthInfoByPubKey", ctx, lockStrength, peerKey)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(string)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// GetPeerAuthInfoByPubKey indicates an expected call of GetPeerAuthInfoByPubKey.
func (mr *MockStoreMockRecorder) GetPeerAuthInfoByPubKey(ctx, lockStrength, peerKey interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeerAuthInfoByPubKey", reflect.TypeOf((*MockStore)(nil).GetPeerAuthInfoByPubKey), ctx, lockStrength, peerKey)
}
// GetPeerByID mocks base method.
func (m *MockStore) GetPeerByID(ctx context.Context, lockStrength LockingStrength, accountID, peerID string) (*peer.Peer, error) {
m.ctrl.T.Helper()

View File

@@ -0,0 +1,297 @@
package server
import (
"context"
"runtime"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/encryption"
"github.com/netbirdio/netbird/management/internals/server/config"
"github.com/netbirdio/netbird/management/server/store"
mgmtProto "github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/util"
)
// skipOnWindows skips the calling test on Windows. The in-process gRPC
// harness uses Unix socket / path conventions that do not cleanly map to
// Windows.
func skipOnWindows(t *testing.T) {
t.Helper()
if runtime.GOOS == "windows" {
t.Skip("skipping on windows; harness uses unix path conventions")
}
}
func fastPathTestConfig(t *testing.T) *config.Config {
t.Helper()
return &config.Config{
Datadir: t.TempDir(),
Stuns: []*config.Host{{
Proto: "udp",
URI: "stun:stun.example:3478",
}},
TURNConfig: &config.TURNConfig{
TimeBasedCredentials: true,
CredentialsTTL: util.Duration{Duration: time.Hour},
Secret: "turn-secret",
Turns: []*config.Host{{
Proto: "udp",
URI: "turn:turn.example:3478",
}},
},
Relay: &config.Relay{
Addresses: []string{"rel.example:443"},
CredentialsTTL: util.Duration{Duration: time.Hour},
Secret: "relay-secret",
},
Signal: &config.Host{
Proto: "http",
URI: "signal.example:10000",
},
HttpConfig: nil,
}
}
// openSync opens a Sync stream with the given meta and returns the decoded first
// SyncResponse plus a cancel function. The caller must call cancel() to release
// server-side routines before opening a new stream for the same peer.
func openSync(t *testing.T, client mgmtProto.ManagementServiceClient, serverKey, peerKey wgtypes.Key, meta *mgmtProto.PeerSystemMeta) (*mgmtProto.SyncResponse, context.CancelFunc) {
t.Helper()
req := &mgmtProto.SyncRequest{Meta: meta}
body, err := encryption.EncryptMessage(serverKey, peerKey, req)
require.NoError(t, err, "encrypt sync request")
ctx, cancel := context.WithCancel(context.Background())
stream, err := client.Sync(ctx, &mgmtProto.EncryptedMessage{
WgPubKey: peerKey.PublicKey().String(),
Body: body,
})
require.NoError(t, err, "open sync stream")
enc := &mgmtProto.EncryptedMessage{}
require.NoError(t, stream.RecvMsg(enc), "receive first sync response")
resp := &mgmtProto.SyncResponse{}
require.NoError(t, encryption.DecryptMessage(serverKey, peerKey, enc.Body, resp), "decrypt sync response")
return resp, cancel
}
// waitForPeerDisconnect polls until the account manager reports the peer as
// disconnected (Status.Connected == false), which happens once the server's
// handleUpdates goroutine has run cancelPeerRoutines for the cancelled
// stream. The deadline is bounded so a stuck server fails the test rather
// than hanging. Replaces the former fixed 50ms sleep which was CI-flaky
// under load or with the race detector on.
func waitForPeerDisconnect(t *testing.T, am *DefaultAccountManager, peerPubKey string) {
t.Helper()
require.Eventually(t, func() bool {
peer, err := am.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
if err != nil {
return false
}
return !peer.Status.Connected
}, 2*time.Second, 10*time.Millisecond, "peer %s should be marked disconnected after stream cancel", peerPubKey)
}
func baseLinuxMeta() *mgmtProto.PeerSystemMeta {
return &mgmtProto.PeerSystemMeta{
Hostname: "linux-host",
GoOS: "linux",
OS: "linux",
Platform: "x86_64",
Kernel: "5.15.0",
NetbirdVersion: "0.70.0",
}
}
func androidMeta() *mgmtProto.PeerSystemMeta {
return &mgmtProto.PeerSystemMeta{
Hostname: "android-host",
GoOS: "android",
OS: "android",
Platform: "arm64",
Kernel: "4.19",
NetbirdVersion: "0.70.0",
}
}
func TestSyncFastPath_FirstSync_SendsFullMap(t *testing.T) {
skipOnWindows(t)
mgmtServer, _, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t))
require.NoError(t, err)
defer cleanup()
defer mgmtServer.GracefulStop()
client, conn, err := createRawClient(addr)
require.NoError(t, err)
defer conn.Close()
keys, err := registerPeers(1, client)
require.NoError(t, err)
serverKey, err := getServerKey(client)
require.NoError(t, err)
resp, cancel := openSync(t, client, *serverKey, *keys[0], baseLinuxMeta())
defer cancel()
require.NotNil(t, resp.NetworkMap, "first sync for a fresh peer must deliver a full NetworkMap")
assert.NotNil(t, resp.NetbirdConfig, "NetbirdConfig must accompany the full map")
}
func TestSyncFastPath_SecondSync_MatchingSerial_SkipsMap(t *testing.T) {
skipOnWindows(t)
mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t))
require.NoError(t, err)
defer cleanup()
defer mgmtServer.GracefulStop()
client, conn, err := createRawClient(addr)
require.NoError(t, err)
defer conn.Close()
keys, err := registerPeers(1, client)
require.NoError(t, err)
serverKey, err := getServerKey(client)
require.NoError(t, err)
first, cancel1 := openSync(t, client, *serverKey, *keys[0], baseLinuxMeta())
require.NotNil(t, first.NetworkMap, "first sync primes cache with a full map")
cancel1()
waitForPeerDisconnect(t, am, keys[0].PublicKey().String())
second, cancel2 := openSync(t, client, *serverKey, *keys[0], baseLinuxMeta())
defer cancel2()
assert.Nil(t, second.NetworkMap, "second sync with unchanged state must omit NetworkMap")
require.NotNil(t, second.NetbirdConfig, "fast path must still deliver NetbirdConfig")
assert.NotEmpty(t, second.NetbirdConfig.Turns, "time-based TURN credentials must be refreshed on fast path")
require.NotNil(t, second.NetbirdConfig.Relay, "relay config must be present on fast path")
assert.NotEmpty(t, second.NetbirdConfig.Relay.TokenPayload, "relay token must be refreshed on fast path")
}
func TestSyncFastPath_AndroidNeverSkips(t *testing.T) {
skipOnWindows(t)
mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t))
require.NoError(t, err)
defer cleanup()
defer mgmtServer.GracefulStop()
client, conn, err := createRawClient(addr)
require.NoError(t, err)
defer conn.Close()
keys, err := registerPeers(1, client)
require.NoError(t, err)
serverKey, err := getServerKey(client)
require.NoError(t, err)
first, cancel1 := openSync(t, client, *serverKey, *keys[0], androidMeta())
require.NotNil(t, first.NetworkMap, "android first sync must deliver a full map")
cancel1()
waitForPeerDisconnect(t, am, keys[0].PublicKey().String())
second, cancel2 := openSync(t, client, *serverKey, *keys[0], androidMeta())
defer cancel2()
require.NotNil(t, second.NetworkMap, "android reconnects must never take the fast path")
}
func TestSyncFastPath_MetaChanged_SendsFullMap(t *testing.T) {
skipOnWindows(t)
mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t))
require.NoError(t, err)
defer cleanup()
defer mgmtServer.GracefulStop()
client, conn, err := createRawClient(addr)
require.NoError(t, err)
defer conn.Close()
keys, err := registerPeers(1, client)
require.NoError(t, err)
serverKey, err := getServerKey(client)
require.NoError(t, err)
first, cancel1 := openSync(t, client, *serverKey, *keys[0], baseLinuxMeta())
require.NotNil(t, first.NetworkMap, "first sync primes cache")
cancel1()
waitForPeerDisconnect(t, am, keys[0].PublicKey().String())
changed := baseLinuxMeta()
changed.Hostname = "linux-host-renamed"
second, cancel2 := openSync(t, client, *serverKey, *keys[0], changed)
defer cancel2()
require.NotNil(t, second.NetworkMap, "meta change must force a full map even when serial matches")
}
func TestSyncFastPath_LoginInvalidatesCache(t *testing.T) {
skipOnWindows(t)
mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t))
require.NoError(t, err)
defer cleanup()
defer mgmtServer.GracefulStop()
client, conn, err := createRawClient(addr)
require.NoError(t, err)
defer conn.Close()
key, err := wgtypes.GeneratePrivateKey()
require.NoError(t, err)
_, err = loginPeerWithValidSetupKey(key, client)
require.NoError(t, err, "initial login must succeed")
serverKey, err := getServerKey(client)
require.NoError(t, err)
first, cancel1 := openSync(t, client, *serverKey, key, baseLinuxMeta())
require.NotNil(t, first.NetworkMap, "first sync primes cache")
cancel1()
waitForPeerDisconnect(t, am, key.PublicKey().String())
// A subsequent login (e.g. SSH key rotation, re-auth) must clear the cache.
_, err = loginPeerWithValidSetupKey(key, client)
require.NoError(t, err, "second login must succeed")
second, cancel2 := openSync(t, client, *serverKey, key, baseLinuxMeta())
defer cancel2()
require.NotNil(t, second.NetworkMap, "Login must invalidate the cache so the next Sync delivers a full map")
}
func TestSyncFastPath_OtherPeerRegistered_ForcesFullMap(t *testing.T) {
skipOnWindows(t)
mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t))
require.NoError(t, err)
defer cleanup()
defer mgmtServer.GracefulStop()
client, conn, err := createRawClient(addr)
require.NoError(t, err)
defer conn.Close()
keys, err := registerPeers(1, client)
require.NoError(t, err)
serverKey, err := getServerKey(client)
require.NoError(t, err)
first, cancel1 := openSync(t, client, *serverKey, *keys[0], baseLinuxMeta())
require.NotNil(t, first.NetworkMap, "first sync primes cache at serial N")
cancel1()
waitForPeerDisconnect(t, am, keys[0].PublicKey().String())
// Registering another peer bumps the account serial via IncrementNetworkSerial.
_, err = registerPeers(1, client)
require.NoError(t, err)
second, cancel2 := openSync(t, client, *serverKey, *keys[0], baseLinuxMeta())
defer cancel2()
require.NotNil(t, second.NetworkMap, "serial advance must force a full map even if meta is unchanged")
}

View File

@@ -0,0 +1,181 @@
package server
import (
"context"
"os"
"path/filepath"
"testing"
"github.com/golang/protobuf/proto" //nolint:staticcheck // matches the generator
"github.com/stretchr/testify/require"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/encryption"
mgmtProto "github.com/netbirdio/netbird/shared/management/proto"
)
// sendWireFixture replays a frozen SyncRequest wire fixture as `peerKey` and
// returns the decoded first SyncResponse plus a cancel function. The caller
// must invoke cancel() so the server releases per-peer routines.
func sendWireFixture(t *testing.T, client mgmtProto.ManagementServiceClient, serverKey, peerKey wgtypes.Key, fixturePath string) (*mgmtProto.SyncResponse, context.CancelFunc) {
t.Helper()
raw, err := os.ReadFile(fixturePath)
require.NoError(t, err, "read fixture %s", fixturePath)
req := &mgmtProto.SyncRequest{}
require.NoError(t, proto.Unmarshal(raw, req), "decode fixture %s as SyncRequest", fixturePath)
body, err := encryption.EncryptMessage(serverKey, peerKey, req)
require.NoError(t, err, "encrypt sync request")
ctx, cancel := context.WithCancel(context.Background())
stream, err := client.Sync(ctx, &mgmtProto.EncryptedMessage{
WgPubKey: peerKey.PublicKey().String(),
Body: body,
})
require.NoError(t, err, "open sync stream")
enc := &mgmtProto.EncryptedMessage{}
require.NoError(t, stream.RecvMsg(enc), "receive first sync response")
resp := &mgmtProto.SyncResponse{}
require.NoError(t, encryption.DecryptMessage(serverKey, peerKey, enc.Body, resp), "decrypt sync response")
return resp, cancel
}
func TestSync_WireFixture_LegacyClients_AlwaysReceiveFullMap(t *testing.T) {
skipOnWindows(t)
cases := []struct {
name string
fixture string
}{
{"v0.20.0 empty SyncRequest", "testdata/sync_request_wire/v0_20_0.bin"},
{"v0.40.0 SyncRequest with Meta", "testdata/sync_request_wire/v0_40_0.bin"},
{"v0.60.0 SyncRequest with Meta", "testdata/sync_request_wire/v0_60_0.bin"},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
mgmtServer, _, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t))
require.NoError(t, err)
defer cleanup()
defer mgmtServer.GracefulStop()
client, conn, err := createRawClient(addr)
require.NoError(t, err)
defer conn.Close()
keys, err := registerPeers(1, client)
require.NoError(t, err)
serverKey, err := getServerKey(client)
require.NoError(t, err)
abs, err := filepath.Abs(tc.fixture)
require.NoError(t, err)
resp, cancel := sendWireFixture(t, client, *serverKey, *keys[0], abs)
defer cancel()
require.NotNil(t, resp.NetworkMap, "legacy client first Sync must deliver a full NetworkMap")
require.NotNil(t, resp.NetbirdConfig, "legacy client first Sync must include NetbirdConfig")
})
}
}
func TestSync_WireFixture_LegacyClient_ReconnectStillGetsFullMap(t *testing.T) {
// v0.40.x clients call GrpcClient.GetNetworkMap on every OS during
// readInitialSettings — they error on nil NetworkMap. Without extra opt-in
// signalling there is no way for the server to know this is a GetNetworkMap
// call rather than a main Sync, so the server's fast path would break them
// on reconnect. This test pins the currently accepted tradeoff: a legacy
// v0.40 client gets a full map on the first Sync but a reconnect with an
// unchanged metaHash hits the primed cache and goes through the fast path.
// When a future proto opt-in lets the server distinguish these clients,
// this assertion must be tightened to require.NotNil(second.NetworkMap).
skipOnWindows(t)
mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t))
require.NoError(t, err)
defer cleanup()
defer mgmtServer.GracefulStop()
client, conn, err := createRawClient(addr)
require.NoError(t, err)
defer conn.Close()
keys, err := registerPeers(1, client)
require.NoError(t, err)
serverKey, err := getServerKey(client)
require.NoError(t, err)
abs, err := filepath.Abs("testdata/sync_request_wire/v0_40_0.bin")
require.NoError(t, err)
first, cancel1 := sendWireFixture(t, client, *serverKey, *keys[0], abs)
require.NotNil(t, first.NetworkMap, "first legacy sync receives full map and primes cache")
cancel1()
waitForPeerDisconnect(t, am, keys[0].PublicKey().String())
second, cancel2 := sendWireFixture(t, client, *serverKey, *keys[0], abs)
defer cancel2()
require.Nil(t, second.NetworkMap, "documented legacy-reconnect tradeoff: warm cache entry causes fast path; update when proto opt-in lands")
require.NotNil(t, second.NetbirdConfig, "fast path still delivers NetbirdConfig")
}
func TestSync_WireFixture_AndroidReconnect_NeverSkips(t *testing.T) {
skipOnWindows(t)
mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t))
require.NoError(t, err)
defer cleanup()
defer mgmtServer.GracefulStop()
client, conn, err := createRawClient(addr)
require.NoError(t, err)
defer conn.Close()
keys, err := registerPeers(1, client)
require.NoError(t, err)
serverKey, err := getServerKey(client)
require.NoError(t, err)
abs, err := filepath.Abs("testdata/sync_request_wire/android_current.bin")
require.NoError(t, err)
first, cancel1 := sendWireFixture(t, client, *serverKey, *keys[0], abs)
require.NotNil(t, first.NetworkMap, "android first sync must deliver a full map")
cancel1()
waitForPeerDisconnect(t, am, keys[0].PublicKey().String())
second, cancel2 := sendWireFixture(t, client, *serverKey, *keys[0], abs)
defer cancel2()
require.NotNil(t, second.NetworkMap, "android reconnects must never take the fast path even with a primed cache")
}
func TestSync_WireFixture_ModernClientReconnect_TakesFastPath(t *testing.T) {
skipOnWindows(t)
mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t))
require.NoError(t, err)
defer cleanup()
defer mgmtServer.GracefulStop()
client, conn, err := createRawClient(addr)
require.NoError(t, err)
defer conn.Close()
keys, err := registerPeers(1, client)
require.NoError(t, err)
serverKey, err := getServerKey(client)
require.NoError(t, err)
abs, err := filepath.Abs("testdata/sync_request_wire/current.bin")
require.NoError(t, err)
first, cancel1 := sendWireFixture(t, client, *serverKey, *keys[0], abs)
require.NotNil(t, first.NetworkMap, "modern first sync primes cache")
cancel1()
waitForPeerDisconnect(t, am, keys[0].PublicKey().String())
second, cancel2 := sendWireFixture(t, client, *serverKey, *keys[0], abs)
defer cancel2()
require.Nil(t, second.NetworkMap, "modern reconnect with unchanged state must skip the NetworkMap")
require.NotNil(t, second.NetbirdConfig, "fast path still delivers NetbirdConfig")
}

View File

@@ -16,6 +16,7 @@ type GRPCMetrics struct {
meter metric.Meter
syncRequestsCounter metric.Int64Counter
syncRequestsBlockedCounter metric.Int64Counter
syncPathCounter metric.Int64Counter
loginRequestsCounter metric.Int64Counter
loginRequestsBlockedCounter metric.Int64Counter
loginRequestHighLatencyCounter metric.Int64Counter
@@ -51,6 +52,14 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
return nil, err
}
syncPathCounter, err := meter.Int64Counter("management.grpc.sync.path.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of sync requests by the path taken (fast vs slow). Slow-path rows carry a reason label for fast-path misses."),
)
if err != nil {
return nil, err
}
loginRequestsCounter, err := meter.Int64Counter("management.grpc.login.request.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of login gRPC requests from the peers to authenticate and receive initial configuration and relay credentials"),
@@ -142,6 +151,7 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
meter: meter,
syncRequestsCounter: syncRequestsCounter,
syncRequestsBlockedCounter: syncRequestsBlockedCounter,
syncPathCounter: syncPathCounter,
loginRequestsCounter: loginRequestsCounter,
loginRequestsBlockedCounter: loginRequestsBlockedCounter,
loginRequestHighLatencyCounter: loginRequestHighLatencyCounter,
@@ -173,6 +183,26 @@ func (grpcMetrics *GRPCMetrics) CountSyncRequestBlocked() {
grpcMetrics.syncRequestsBlockedCounter.Add(grpcMetrics.ctx, 1)
}
// CountFastPathSync increments the sync-path counter for a Sync that took the
// fast path. Used together with CountSlowPathSync to graph the fast-path hit
// rate and, via the reason label on the slow-path counts, see where the
// misses go (android / cache_miss / serial_mismatch / meta_mismatch / ...).
func (grpcMetrics *GRPCMetrics) CountFastPathSync() {
grpcMetrics.syncPathCounter.Add(grpcMetrics.ctx, 1, metric.WithAttributes(attribute.String("path", "fast")))
}
// CountSlowPathSync increments the sync-path counter for a Sync that fell
// through to the slow path. reason is a short tag describing why the fast
// path was skipped; pass "" if the reason is unknown or the Sync never had
// a chance to attempt the fast path.
func (grpcMetrics *GRPCMetrics) CountSlowPathSync(reason string) {
attrs := []attribute.KeyValue{attribute.String("path", "slow")}
if reason != "" {
attrs = append(attrs, attribute.String("reason", reason))
}
grpcMetrics.syncPathCounter.Add(grpcMetrics.ctx, 1, metric.WithAttributes(attrs...))
}
// CountGetKeyRequest counts the number of gRPC get server key requests coming to the gRPC API
func (grpcMetrics *GRPCMetrics) CountGetKeyRequest() {
grpcMetrics.getKeyRequestsCounter.Add(grpcMetrics.ctx, 1)

View File

@@ -0,0 +1,2 @@
current.bin
android_current.bin

View File

@@ -0,0 +1,31 @@
# SyncRequest wire-format fixtures
These files are the byte-for-byte contents of the `SyncRequest` proto a netbird
client of each listed version would put on the wire. `sync_legacy_wire_test.go`
decodes each file, wraps it in the current `EncryptedMessage` envelope and
replays it through the in-process gRPC server to prove that the peer-sync fast
path does not break historical clients.
File | Client era | Notes
-----|-----------|------
`v0_20_0.bin` | v0.20.x | `message SyncRequest {}` — no fields on the wire. Main Sync loop in v0.20 gracefully skips nil `NetworkMap`, so the fixture is expected to get a full map (empty Sync payload → cache miss → slow path). **Checked in — frozen snapshot.**
`v0_40_0.bin` | v0.40.x | First release with `Meta` at tag 1. v0.40 calls `GrpcClient.GetNetworkMap` on every OS; fixture must continue to produce a full map. **Checked in — frozen snapshot.**
`v0_60_0.bin` | v0.60.x | Same SyncRequest shape as v0.40 but tagged with a newer `NetbirdVersion`. **Checked in — frozen snapshot.**
`current.bin` | latest | Fully-populated `PeerSystemMeta`. **Not checked in — regenerated at CI time by `generate.go`.**
`android_current.bin` | latest, Android | Same shape as `current.bin` with `GoOS=android`; the server must never take the fast path even after the cache is primed. **Not checked in — regenerated at CI time by `generate.go`.**
## Regenerating
`generate.go` writes only `current.bin` and `android_current.bin`. CI invokes it
before running the management test suite:
```sh
go run ./management/server/testdata/sync_request_wire/generate.go
```
Run the same command locally if you are running the wire tests by hand.
The three legacy fixtures are intentionally frozen. Do not regenerate them —
their value is that they survive proto changes unchanged, so a future proto
change that silently breaks the old wire format is caught by CI replaying the
frozen bytes and failing to decode them.

View File

@@ -0,0 +1,73 @@
//go:build ignore
// generate.go produces the SyncRequest wire-format fixtures that the current
// netbird client (and the android variant) put on the wire. These two files
// are regenerated at CI time — run with:
//
// go run ./management/server/testdata/sync_request_wire/generate.go
//
// The legacy fixtures (v0_20_0.bin, v0_40_0.bin, v0_60_0.bin) are frozen
// snapshots of what older clients sent. They are checked in and intentionally
// never regenerated here, so a future proto change that silently breaks the
// old wire format is caught by CI replaying the frozen bytes.
package main
import (
"fmt"
"os"
"path/filepath"
"github.com/golang/protobuf/proto" //nolint:staticcheck // wire-format stability
mgmtProto "github.com/netbirdio/netbird/shared/management/proto"
)
func main() {
outDir := filepath.Join("management", "server", "testdata", "sync_request_wire")
if err := os.MkdirAll(outDir, 0o755); err != nil {
fmt.Fprintf(os.Stderr, "mkdir %s: %v\n", outDir, err)
os.Exit(1)
}
fixtures := map[string]*mgmtProto.SyncRequest{
// current: fully-populated meta a modern client would send.
"current.bin": {
Meta: &mgmtProto.PeerSystemMeta{
Hostname: "modern-host",
GoOS: "linux",
OS: "linux",
Platform: "x86_64",
Kernel: "6.5.0",
NetbirdVersion: "0.70.0",
UiVersion: "0.70.0",
KernelVersion: "6.5.0-rc1",
},
},
// android: exercises the never-skip branch regardless of cache state.
"android_current.bin": {
Meta: &mgmtProto.PeerSystemMeta{
Hostname: "android-host",
GoOS: "android",
OS: "android",
Platform: "arm64",
Kernel: "4.19",
NetbirdVersion: "0.70.0",
},
},
}
for name, msg := range fixtures {
payload, err := proto.Marshal(msg)
if err != nil {
fmt.Fprintf(os.Stderr, "marshal %s: %v\n", name, err)
os.Exit(1)
}
path := filepath.Join(outDir, name)
if err := os.WriteFile(path, payload, 0o644); err != nil {
fmt.Fprintf(os.Stderr, "write %s: %v\n", path, err)
os.Exit(1)
}
fmt.Printf("wrote %s (%d bytes)\n", path, len(payload))
}
}

View File

@@ -0,0 +1,3 @@
0
v40-hostlinux4.15.0*x86_642linux:0.40.0

View File

@@ -0,0 +1,3 @@
0
v60-hostlinux5.15.0*x86_642linux:0.60.0

View File

@@ -138,7 +138,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
if err != nil {
t.Fatal(err)
}
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, mgmt.MockIntegratedValidator{}, networkMapController, nil)
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, mgmt.MockIntegratedValidator{}, networkMapController, nil, nil, nil, nil)
if err != nil {
t.Fatal(err)
}