Refactor sync fast path to introduce caching for ExtraSettings and peer groups, optimize MarkPeerConnected with async writes, and reduce DB round trips.

This commit is contained in:
mlsmaycon
2026-04-24 18:13:37 +02:00
parent ac6b73005d
commit 8c521a7cb5
10 changed files with 106 additions and 30 deletions

View File

@@ -135,7 +135,7 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp
if err != nil {
t.Fatal(err)
}
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &mgmt.MockIntegratedValidator{}, networkMapController, nil, nil, nil)
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &mgmt.MockIntegratedValidator{}, networkMapController, nil, nil, nil, nil)
if err != nil {
t.Fatal(err)
}

View File

@@ -1671,7 +1671,7 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
if err != nil {
return nil, "", err
}
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil, nil, nil)
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil, nil, nil, nil)
if err != nil {
return nil, "", err
}

View File

@@ -335,7 +335,7 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
if err != nil {
return nil, "", err
}
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil, nil, nil)
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil, nil, nil, nil)
if err != nil {
return nil, "", err
}

View File

@@ -165,7 +165,7 @@ func (s *BaseServer) GRPCServer() *grpc.Server {
gRPCAPIHandler := grpc.NewServer(gRPCOpts...)
peerSerialCache := nbgrpc.NewPeerSerialCache(context.Background(), s.CacheStore(), nbgrpc.DefaultPeerSerialCacheTTL)
fastPathFlag := nbgrpc.RunFastPathFlagRoutine(context.Background(), s.CacheStore(), nbgrpc.DefaultFastPathFlagInterval, nbgrpc.DefaultFastPathFlagKey)
srv, err := nbgrpc.NewServer(s.Config, s.AccountManager(), s.SettingsManager(), s.JobManager(), s.SecretsManager(), s.Metrics(), s.AuthManager(), s.IntegratedValidator(), s.NetworkMapController(), s.OAuthConfigProvider(), peerSerialCache, fastPathFlag)
srv, err := nbgrpc.NewServer(s.Config, s.AccountManager(), s.SettingsManager(), s.JobManager(), s.SecretsManager(), s.Metrics(), s.AuthManager(), s.IntegratedValidator(), s.NetworkMapController(), s.OAuthConfigProvider(), peerSerialCache, fastPathFlag, s.CacheStore())
if err != nil {
log.Fatalf("failed to create management server: %v", err)
}

View File

@@ -14,6 +14,7 @@ import (
"sync/atomic"
"time"
cachestore "github.com/eko/gocache/lib/v4/store"
pb "github.com/golang/protobuf/proto" // nolint
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
@@ -93,12 +94,26 @@ type Server struct {
// flag or a flag reporting disabled forces every Sync through the full
// network map path.
fastPathFlag *FastPathFlag
// Secondary TTL-based caches used by the Sync fast path to skip DB reads
// for the account's ExtraSettings and a peer's group membership. Both
// are nil-safe and disabled if the shared cache store wasn't provided.
extraSettingsCache *extraSettingsCache
peerGroupsCache *peerGroupsCache
// inflightMarkPeerConnected dedupes the fire-and-forget MarkPeerConnected
// writes kicked off by the fast path. Keys are peer pubkeys; presence
// means a background goroutine is already writing for that peer, so
// concurrent fast-path Syncs for the same peer coalesce to one write.
inflightMarkPeerConnected sync.Map
}
// NewServer creates a new Management server. peerSerialCache and fastPathFlag
// are both optional; when either is nil or the flag reports disabled, the
// Sync fast path is disabled and every request runs the full map computation,
// matching the pre-cache behaviour.
// matching the pre-cache behaviour. cacheStore is used to back the
// secondary fast-path caches (account serial, ExtraSettings, peer groups);
// a nil store silently disables those caches without affecting correctness.
func NewServer(
config *nbconfig.Config,
accountManager account.Manager,
@@ -112,6 +127,7 @@ func NewServer(
oAuthConfigProvider idp.OAuthConfigProvider,
peerSerialCache *PeerSerialCache,
fastPathFlag *FastPathFlag,
cacheStore cachestore.StoreInterface,
) (*Server, error) {
if appMetrics != nil {
// update gauge based on number of connected peers which is equal to open gRPC streams
@@ -162,6 +178,9 @@ func NewServer(
peerSerialCache: peerSerialCache,
fastPathFlag: fastPathFlag,
extraSettingsCache: newExtraSettingsCache(context.Background(), cacheStore, DefaultExtraSettingsCacheTTL),
peerGroupsCache: newPeerGroupsCache(context.Background(), cacheStore, DefaultPeerGroupsCacheTTL),
}, nil
}

View File

@@ -17,7 +17,6 @@ import (
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
nbconfig "github.com/netbirdio/netbird/management/internals/server/config"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/settings"
"github.com/netbirdio/netbird/management/server/store"
nbtypes "github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/proto"
@@ -129,6 +128,12 @@ func shouldSkipNetworkMap(goOS string, hit bool, cached peerSyncEntry, currentSe
return true
}
// extraSettingsFetcher is the dependency used by buildFastPathResponse to
// obtain ExtraSettings for the peer's account. Matches the shape of the
// method on settings.Manager but as a plain function so production callers
// can wrap it with a cache and tests can inject a stub.
type extraSettingsFetcher func(ctx context.Context, accountID string) (*nbtypes.ExtraSettings, error)
// buildFastPathResponse constructs a SyncResponse containing only NetbirdConfig
// with fresh TURN/Relay tokens, mirroring the shape used by
// TimeBasedAuthSecretsManager when pushing token refreshes. The response omits
@@ -138,7 +143,7 @@ func buildFastPathResponse(
ctx context.Context,
cfg *nbconfig.Config,
secrets SecretsManager,
settingsMgr settings.Manager,
fetchExtraSettings extraSettingsFetcher,
fetchGroups peerGroupFetcher,
peer *nbpeer.Peer,
) *proto.SyncResponse {
@@ -161,25 +166,23 @@ func buildFastPathResponse(
}
var extraSettings *nbtypes.ExtraSettings
extraSettingsStart := time.Now()
if es, err := settingsMgr.GetExtraSettings(ctx, peer.AccountID); err != nil {
log.WithContext(ctx).Debugf("fast path: get extra settings: %v", err)
} else {
extraSettings = es
if fetchExtraSettings != nil {
if es, err := fetchExtraSettings(ctx, peer.AccountID); err != nil {
log.WithContext(ctx).Debugf("fast path: get extra settings: %v", err)
} else {
extraSettings = es
}
}
log.WithContext(ctx).Debugf("fast path: GetExtraSettings took %s", time.Since(extraSettingsStart))
nbConfig := toNetbirdConfig(cfg, turnToken, relayToken, extraSettings)
var peerGroups []string
if fetchGroups != nil {
start := time.Now()
if ids, err := fetchGroups(ctx, peer.AccountID, peer.ID); err != nil {
log.WithContext(ctx).Debugf("fast path: get peer group ids: %v", err)
} else {
peerGroups = ids
}
log.WithContext(ctx).Debugf("fast path: get peer groups took %s", time.Since(start))
}
extendStart := time.Now()
@@ -189,6 +192,25 @@ func buildFastPathResponse(
return &proto.SyncResponse{NetbirdConfig: nbConfig}
}
// fetchExtraSettings returns a cached ExtraSettings when available, falling
// back to the settings manager on miss. Populates the cache on miss so
// subsequent fast-path Syncs hit it.
func (s *Server) fetchExtraSettings(ctx context.Context, accountID string) (*nbtypes.ExtraSettings, error) {
if es, ok := s.extraSettingsCache.get(accountID); ok {
log.WithContext(ctx).Debugf("fast path: GetExtraSettings skipped (cache hit)")
return es, nil
}
start := time.Now()
es, err := s.settingsManager.GetExtraSettings(ctx, accountID)
if err != nil {
return nil, err
}
log.WithContext(ctx).Debugf("fast path: GetExtraSettings took %s", time.Since(start))
s.extraSettingsCache.set(accountID, es)
return es, nil
}
// tryFastPathSync decides whether the current Sync can be answered with a
// lightweight NetbirdConfig-only response. When the fast path runs, it takes
// over the whole Sync handler (MarkPeerConnected, send, OnPeerConnected,
@@ -299,15 +321,36 @@ func (s *Server) commitFastPath(
}
log.WithContext(ctx).Debugf("fast path: OnPeerConnectedWithPeer took %s", time.Since(onConnectedStart))
markStart := time.Now()
if err := s.accountManager.MarkPeerConnected(ctx, peerKey.String(), true, realIP, accountID, syncStart); err != nil {
log.WithContext(ctx).Warnf("fast path: mark connected for peer %s: %v", peerKey.String(), err)
}
log.WithContext(ctx).Debugf("fast path: MarkPeerConnected took %s", time.Since(markStart))
s.markPeerConnectedAsync(peerKey.String(), realIP, accountID, syncStart)
return peer, updates, true
}
// markPeerConnectedAsync fires MarkPeerConnected in a detached goroutine so
// the Sync hot path does not wait on a DB write that can spike into the
// multi-second range under contention. LastSeen becomes eventually-consistent
// by at most one write; the peer's next Sync or the per-peer expiration
// routines correct any drift. Concurrent fast-path Syncs for the same peer
// coalesce to a single background write via the inflight map.
func (s *Server) markPeerConnectedAsync(peerKey string, realIP net.IP, accountID string, syncStart time.Time) {
if _, loaded := s.inflightMarkPeerConnected.LoadOrStore(peerKey, struct{}{}); loaded {
log.Debugf("fast path: async MarkPeerConnected for %s coalesced (already in flight)", peerKey)
return
}
go func() {
defer s.inflightMarkPeerConnected.Delete(peerKey)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
start := time.Now()
if err := s.accountManager.MarkPeerConnected(ctx, peerKey, true, realIP, accountID, syncStart); err != nil {
log.Warnf("fast path: async MarkPeerConnected for %s: %v", peerKey, err)
return
}
log.Debugf("fast path: async MarkPeerConnected for %s took %s", peerKey, time.Since(start))
}()
}
// runFastPathSync executes the fast path: send the lean response, kick off
// token refresh, release the per-peer lock, then block on handleUpdates until
// the stream is closed. Peer lookup and subscription have already been
@@ -365,7 +408,7 @@ func (s *Server) runFastPathSync(
// sendFastPathResponse builds a NetbirdConfig-only SyncResponse, encrypts it
// with the peer's WireGuard key and pushes it over the stream.
func (s *Server) sendFastPathResponse(ctx context.Context, peerKey wgtypes.Key, peer *nbpeer.Peer, srv proto.ManagementService_SyncServer) error {
resp := buildFastPathResponse(ctx, s.config, s.secretsManager, s.settingsManager, s.fetchPeerGroups, peer)
resp := buildFastPathResponse(ctx, s.config, s.secretsManager, s.fetchExtraSettings, s.fetchPeerGroups, peer)
key, err := s.secretsManager.GetWGKey()
if err != nil {
@@ -387,10 +430,23 @@ func (s *Server) sendFastPathResponse(ctx context.Context, peerKey wgtypes.Key,
return nil
}
// fetchPeerGroups is the dependency injected into buildFastPathResponse in
// production. A nil accountManager store is treated as "no groups".
// fetchPeerGroups returns a cached list of group IDs for the peer when
// available, falling back to the account manager's store on miss. Populates
// the cache on miss so subsequent fast-path Syncs hit it.
func (s *Server) fetchPeerGroups(ctx context.Context, accountID, peerID string) ([]string, error) {
return s.accountManager.GetStore().GetPeerGroupIDs(ctx, store.LockingStrengthNone, accountID, peerID)
if ids, ok := s.peerGroupsCache.get(peerID); ok {
log.WithContext(ctx).Debugf("fast path: GetPeerGroupIDs skipped (cache hit)")
return ids, nil
}
start := time.Now()
ids, err := s.accountManager.GetStore().GetPeerGroupIDs(ctx, store.LockingStrengthNone, accountID, peerID)
if err != nil {
return nil, err
}
log.WithContext(ctx).Debugf("fast path: GetPeerGroupIDs took %s", time.Since(start))
s.peerGroupsCache.set(peerID, ids)
return ids, nil
}
// recordPeerSyncEntry writes the serial just delivered to this peer so a

View File

@@ -68,7 +68,7 @@ func TestBuildFastPathResponse_TimeBasedTURNAndRelay_FreshTokens(t *testing.T) {
settingsMock := settings.NewMockManager(ctrl)
settingsMock.EXPECT().GetExtraSettings(gomock.Any(), "account-id").Return(&types.ExtraSettings{}, nil).AnyTimes()
resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock, noGroupsFetcher, fastPathTestPeer())
resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock.GetExtraSettings, noGroupsFetcher, fastPathTestPeer())
require.NotNil(t, resp, "response must not be nil")
assert.Nil(t, resp.NetworkMap, "fast path must omit NetworkMap")
@@ -116,7 +116,7 @@ func TestBuildFastPathResponse_StaticTURNCredentials(t *testing.T) {
settingsMock := settings.NewMockManager(ctrl)
settingsMock.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes()
resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock, noGroupsFetcher, fastPathTestPeer())
resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock.GetExtraSettings, noGroupsFetcher, fastPathTestPeer())
require.NotNil(t, resp.NetbirdConfig)
require.Len(t, resp.NetbirdConfig.Turns, 1, "static TURN must appear in response")
@@ -134,7 +134,7 @@ func TestBuildFastPathResponse_NoRelayConfigured_NoRelaySection(t *testing.T) {
settingsMock := settings.NewMockManager(ctrl)
settingsMock.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes()
resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock, noGroupsFetcher, fastPathTestPeer())
resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock.GetExtraSettings, noGroupsFetcher, fastPathTestPeer())
require.NotNil(t, resp.NetbirdConfig, "NetbirdConfig must be non-nil even without relay/turn")
assert.Nil(t, resp.NetbirdConfig.Relay, "Relay must be absent when not configured")
assert.Empty(t, resp.NetbirdConfig.Turns, "Turns must be empty when not configured")
@@ -149,7 +149,7 @@ func TestBuildFastPathResponse_ExtraSettingsErrorStillReturnsResponse(t *testing
settingsMock := settings.NewMockManager(ctrl)
settingsMock.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(nil, assertAnError).AnyTimes()
resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock, noGroupsFetcher, fastPathTestPeer())
resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock.GetExtraSettings, noGroupsFetcher, fastPathTestPeer())
require.NotNil(t, resp, "extra settings failure must degrade gracefully into an empty fast-path response")
assert.Nil(t, resp.NetworkMap, "NetworkMap still omitted on degraded path")
}

View File

@@ -393,7 +393,7 @@ func startManagementForTest(t *testing.T, testFile string, config *config.Config
peerSerialCache := nbgrpc.NewPeerSerialCache(ctx, cacheStore, time.Minute)
fastPathFlag := nbgrpc.NewFastPathFlag(true)
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, MockIntegratedValidator{}, networkMapController, nil, peerSerialCache, fastPathFlag)
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, MockIntegratedValidator{}, networkMapController, nil, peerSerialCache, fastPathFlag, cacheStore)
if err != nil {
return nil, nil, "", cleanup, err
}

View File

@@ -258,6 +258,7 @@ func startServer(
nil,
nil,
nil,
nil,
)
if err != nil {
t.Fatalf("failed creating management server: %v", err)

View File

@@ -138,7 +138,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
if err != nil {
t.Fatal(err)
}
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, mgmt.MockIntegratedValidator{}, networkMapController, nil, nil, nil)
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, mgmt.MockIntegratedValidator{}, networkMapController, nil, nil, nil, nil)
if err != nil {
t.Fatal(err)
}