diff --git a/client/cmd/testutil_test.go b/client/cmd/testutil_test.go index d7c48b1f2..39c592190 100644 --- a/client/cmd/testutil_test.go +++ b/client/cmd/testutil_test.go @@ -135,7 +135,7 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp if err != nil { t.Fatal(err) } - mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &mgmt.MockIntegratedValidator{}, networkMapController, nil, nil, nil) + mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &mgmt.MockIntegratedValidator{}, networkMapController, nil, nil, nil, nil) if err != nil { t.Fatal(err) } diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index 936083074..98730bc4a 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -1671,7 +1671,7 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri if err != nil { return nil, "", err } - mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil, nil, nil) + mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil, nil, nil, nil) if err != nil { return nil, "", err } diff --git a/client/server/server_test.go b/client/server/server_test.go index f0b9abce3..59fea58f4 100644 --- a/client/server/server_test.go +++ b/client/server/server_test.go @@ -335,7 +335,7 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve if err != nil { return nil, "", err } - mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil, nil, nil) + mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil, nil, nil, nil) if err != nil { return nil, "", err } diff --git a/management/internals/server/boot.go b/management/internals/server/boot.go index 4c47734fa..f1772fe71 100644 --- a/management/internals/server/boot.go +++ b/management/internals/server/boot.go @@ -165,7 +165,7 @@ func (s *BaseServer) GRPCServer() *grpc.Server { gRPCAPIHandler := grpc.NewServer(gRPCOpts...) peerSerialCache := nbgrpc.NewPeerSerialCache(context.Background(), s.CacheStore(), nbgrpc.DefaultPeerSerialCacheTTL) fastPathFlag := nbgrpc.RunFastPathFlagRoutine(context.Background(), s.CacheStore(), nbgrpc.DefaultFastPathFlagInterval, nbgrpc.DefaultFastPathFlagKey) - srv, err := nbgrpc.NewServer(s.Config, s.AccountManager(), s.SettingsManager(), s.JobManager(), s.SecretsManager(), s.Metrics(), s.AuthManager(), s.IntegratedValidator(), s.NetworkMapController(), s.OAuthConfigProvider(), peerSerialCache, fastPathFlag) + srv, err := nbgrpc.NewServer(s.Config, s.AccountManager(), s.SettingsManager(), s.JobManager(), s.SecretsManager(), s.Metrics(), s.AuthManager(), s.IntegratedValidator(), s.NetworkMapController(), s.OAuthConfigProvider(), peerSerialCache, fastPathFlag, s.CacheStore()) if err != nil { log.Fatalf("failed to create management server: %v", err) } diff --git a/management/internals/shared/grpc/server.go b/management/internals/shared/grpc/server.go index b8df98731..9235e7c2d 100644 --- a/management/internals/shared/grpc/server.go +++ b/management/internals/shared/grpc/server.go @@ -14,6 +14,7 @@ import ( "sync/atomic" "time" + cachestore "github.com/eko/gocache/lib/v4/store" pb "github.com/golang/protobuf/proto" // nolint "github.com/golang/protobuf/ptypes/timestamp" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip" @@ -93,12 +94,26 @@ type Server struct { // flag or a flag reporting disabled forces every Sync through the full // network map path. fastPathFlag *FastPathFlag + + // Secondary TTL-based caches used by the Sync fast path to skip DB reads + // for the account's ExtraSettings and a peer's group membership. Both + // are nil-safe and disabled if the shared cache store wasn't provided. + extraSettingsCache *extraSettingsCache + peerGroupsCache *peerGroupsCache + + // inflightMarkPeerConnected dedupes the fire-and-forget MarkPeerConnected + // writes kicked off by the fast path. Keys are peer pubkeys; presence + // means a background goroutine is already writing for that peer, so + // concurrent fast-path Syncs for the same peer coalesce to one write. + inflightMarkPeerConnected sync.Map } // NewServer creates a new Management server. peerSerialCache and fastPathFlag // are both optional; when either is nil or the flag reports disabled, the // Sync fast path is disabled and every request runs the full map computation, -// matching the pre-cache behaviour. +// matching the pre-cache behaviour. cacheStore is used to back the +// secondary fast-path caches (account serial, ExtraSettings, peer groups); +// a nil store silently disables those caches without affecting correctness. func NewServer( config *nbconfig.Config, accountManager account.Manager, @@ -112,6 +127,7 @@ func NewServer( oAuthConfigProvider idp.OAuthConfigProvider, peerSerialCache *PeerSerialCache, fastPathFlag *FastPathFlag, + cacheStore cachestore.StoreInterface, ) (*Server, error) { if appMetrics != nil { // update gauge based on number of connected peers which is equal to open gRPC streams @@ -162,6 +178,9 @@ func NewServer( peerSerialCache: peerSerialCache, fastPathFlag: fastPathFlag, + + extraSettingsCache: newExtraSettingsCache(context.Background(), cacheStore, DefaultExtraSettingsCacheTTL), + peerGroupsCache: newPeerGroupsCache(context.Background(), cacheStore, DefaultPeerGroupsCacheTTL), }, nil } diff --git a/management/internals/shared/grpc/sync_fast_path.go b/management/internals/shared/grpc/sync_fast_path.go index f8998f7ac..e630cc29f 100644 --- a/management/internals/shared/grpc/sync_fast_path.go +++ b/management/internals/shared/grpc/sync_fast_path.go @@ -17,7 +17,6 @@ import ( "github.com/netbirdio/netbird/management/internals/controllers/network_map" nbconfig "github.com/netbirdio/netbird/management/internals/server/config" nbpeer "github.com/netbirdio/netbird/management/server/peer" - "github.com/netbirdio/netbird/management/server/settings" "github.com/netbirdio/netbird/management/server/store" nbtypes "github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/shared/management/proto" @@ -129,6 +128,12 @@ func shouldSkipNetworkMap(goOS string, hit bool, cached peerSyncEntry, currentSe return true } +// extraSettingsFetcher is the dependency used by buildFastPathResponse to +// obtain ExtraSettings for the peer's account. Matches the shape of the +// method on settings.Manager but as a plain function so production callers +// can wrap it with a cache and tests can inject a stub. +type extraSettingsFetcher func(ctx context.Context, accountID string) (*nbtypes.ExtraSettings, error) + // buildFastPathResponse constructs a SyncResponse containing only NetbirdConfig // with fresh TURN/Relay tokens, mirroring the shape used by // TimeBasedAuthSecretsManager when pushing token refreshes. The response omits @@ -138,7 +143,7 @@ func buildFastPathResponse( ctx context.Context, cfg *nbconfig.Config, secrets SecretsManager, - settingsMgr settings.Manager, + fetchExtraSettings extraSettingsFetcher, fetchGroups peerGroupFetcher, peer *nbpeer.Peer, ) *proto.SyncResponse { @@ -161,25 +166,23 @@ func buildFastPathResponse( } var extraSettings *nbtypes.ExtraSettings - extraSettingsStart := time.Now() - if es, err := settingsMgr.GetExtraSettings(ctx, peer.AccountID); err != nil { - log.WithContext(ctx).Debugf("fast path: get extra settings: %v", err) - } else { - extraSettings = es + if fetchExtraSettings != nil { + if es, err := fetchExtraSettings(ctx, peer.AccountID); err != nil { + log.WithContext(ctx).Debugf("fast path: get extra settings: %v", err) + } else { + extraSettings = es + } } - log.WithContext(ctx).Debugf("fast path: GetExtraSettings took %s", time.Since(extraSettingsStart)) nbConfig := toNetbirdConfig(cfg, turnToken, relayToken, extraSettings) var peerGroups []string if fetchGroups != nil { - start := time.Now() if ids, err := fetchGroups(ctx, peer.AccountID, peer.ID); err != nil { log.WithContext(ctx).Debugf("fast path: get peer group ids: %v", err) } else { peerGroups = ids } - log.WithContext(ctx).Debugf("fast path: get peer groups took %s", time.Since(start)) } extendStart := time.Now() @@ -189,6 +192,25 @@ func buildFastPathResponse( return &proto.SyncResponse{NetbirdConfig: nbConfig} } +// fetchExtraSettings returns a cached ExtraSettings when available, falling +// back to the settings manager on miss. Populates the cache on miss so +// subsequent fast-path Syncs hit it. +func (s *Server) fetchExtraSettings(ctx context.Context, accountID string) (*nbtypes.ExtraSettings, error) { + if es, ok := s.extraSettingsCache.get(accountID); ok { + log.WithContext(ctx).Debugf("fast path: GetExtraSettings skipped (cache hit)") + return es, nil + } + + start := time.Now() + es, err := s.settingsManager.GetExtraSettings(ctx, accountID) + if err != nil { + return nil, err + } + log.WithContext(ctx).Debugf("fast path: GetExtraSettings took %s", time.Since(start)) + s.extraSettingsCache.set(accountID, es) + return es, nil +} + // tryFastPathSync decides whether the current Sync can be answered with a // lightweight NetbirdConfig-only response. When the fast path runs, it takes // over the whole Sync handler (MarkPeerConnected, send, OnPeerConnected, @@ -299,15 +321,36 @@ func (s *Server) commitFastPath( } log.WithContext(ctx).Debugf("fast path: OnPeerConnectedWithPeer took %s", time.Since(onConnectedStart)) - markStart := time.Now() - if err := s.accountManager.MarkPeerConnected(ctx, peerKey.String(), true, realIP, accountID, syncStart); err != nil { - log.WithContext(ctx).Warnf("fast path: mark connected for peer %s: %v", peerKey.String(), err) - } - log.WithContext(ctx).Debugf("fast path: MarkPeerConnected took %s", time.Since(markStart)) + s.markPeerConnectedAsync(peerKey.String(), realIP, accountID, syncStart) return peer, updates, true } +// markPeerConnectedAsync fires MarkPeerConnected in a detached goroutine so +// the Sync hot path does not wait on a DB write that can spike into the +// multi-second range under contention. LastSeen becomes eventually-consistent +// by at most one write; the peer's next Sync or the per-peer expiration +// routines correct any drift. Concurrent fast-path Syncs for the same peer +// coalesce to a single background write via the inflight map. +func (s *Server) markPeerConnectedAsync(peerKey string, realIP net.IP, accountID string, syncStart time.Time) { + if _, loaded := s.inflightMarkPeerConnected.LoadOrStore(peerKey, struct{}{}); loaded { + log.Debugf("fast path: async MarkPeerConnected for %s coalesced (already in flight)", peerKey) + return + } + go func() { + defer s.inflightMarkPeerConnected.Delete(peerKey) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + start := time.Now() + if err := s.accountManager.MarkPeerConnected(ctx, peerKey, true, realIP, accountID, syncStart); err != nil { + log.Warnf("fast path: async MarkPeerConnected for %s: %v", peerKey, err) + return + } + log.Debugf("fast path: async MarkPeerConnected for %s took %s", peerKey, time.Since(start)) + }() +} + // runFastPathSync executes the fast path: send the lean response, kick off // token refresh, release the per-peer lock, then block on handleUpdates until // the stream is closed. Peer lookup and subscription have already been @@ -365,7 +408,7 @@ func (s *Server) runFastPathSync( // sendFastPathResponse builds a NetbirdConfig-only SyncResponse, encrypts it // with the peer's WireGuard key and pushes it over the stream. func (s *Server) sendFastPathResponse(ctx context.Context, peerKey wgtypes.Key, peer *nbpeer.Peer, srv proto.ManagementService_SyncServer) error { - resp := buildFastPathResponse(ctx, s.config, s.secretsManager, s.settingsManager, s.fetchPeerGroups, peer) + resp := buildFastPathResponse(ctx, s.config, s.secretsManager, s.fetchExtraSettings, s.fetchPeerGroups, peer) key, err := s.secretsManager.GetWGKey() if err != nil { @@ -387,10 +430,23 @@ func (s *Server) sendFastPathResponse(ctx context.Context, peerKey wgtypes.Key, return nil } -// fetchPeerGroups is the dependency injected into buildFastPathResponse in -// production. A nil accountManager store is treated as "no groups". +// fetchPeerGroups returns a cached list of group IDs for the peer when +// available, falling back to the account manager's store on miss. Populates +// the cache on miss so subsequent fast-path Syncs hit it. func (s *Server) fetchPeerGroups(ctx context.Context, accountID, peerID string) ([]string, error) { - return s.accountManager.GetStore().GetPeerGroupIDs(ctx, store.LockingStrengthNone, accountID, peerID) + if ids, ok := s.peerGroupsCache.get(peerID); ok { + log.WithContext(ctx).Debugf("fast path: GetPeerGroupIDs skipped (cache hit)") + return ids, nil + } + + start := time.Now() + ids, err := s.accountManager.GetStore().GetPeerGroupIDs(ctx, store.LockingStrengthNone, accountID, peerID) + if err != nil { + return nil, err + } + log.WithContext(ctx).Debugf("fast path: GetPeerGroupIDs took %s", time.Since(start)) + s.peerGroupsCache.set(peerID, ids) + return ids, nil } // recordPeerSyncEntry writes the serial just delivered to this peer so a diff --git a/management/internals/shared/grpc/sync_fast_path_response_test.go b/management/internals/shared/grpc/sync_fast_path_response_test.go index 66b6bfe98..563543caf 100644 --- a/management/internals/shared/grpc/sync_fast_path_response_test.go +++ b/management/internals/shared/grpc/sync_fast_path_response_test.go @@ -68,7 +68,7 @@ func TestBuildFastPathResponse_TimeBasedTURNAndRelay_FreshTokens(t *testing.T) { settingsMock := settings.NewMockManager(ctrl) settingsMock.EXPECT().GetExtraSettings(gomock.Any(), "account-id").Return(&types.ExtraSettings{}, nil).AnyTimes() - resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock, noGroupsFetcher, fastPathTestPeer()) + resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock.GetExtraSettings, noGroupsFetcher, fastPathTestPeer()) require.NotNil(t, resp, "response must not be nil") assert.Nil(t, resp.NetworkMap, "fast path must omit NetworkMap") @@ -116,7 +116,7 @@ func TestBuildFastPathResponse_StaticTURNCredentials(t *testing.T) { settingsMock := settings.NewMockManager(ctrl) settingsMock.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes() - resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock, noGroupsFetcher, fastPathTestPeer()) + resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock.GetExtraSettings, noGroupsFetcher, fastPathTestPeer()) require.NotNil(t, resp.NetbirdConfig) require.Len(t, resp.NetbirdConfig.Turns, 1, "static TURN must appear in response") @@ -134,7 +134,7 @@ func TestBuildFastPathResponse_NoRelayConfigured_NoRelaySection(t *testing.T) { settingsMock := settings.NewMockManager(ctrl) settingsMock.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes() - resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock, noGroupsFetcher, fastPathTestPeer()) + resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock.GetExtraSettings, noGroupsFetcher, fastPathTestPeer()) require.NotNil(t, resp.NetbirdConfig, "NetbirdConfig must be non-nil even without relay/turn") assert.Nil(t, resp.NetbirdConfig.Relay, "Relay must be absent when not configured") assert.Empty(t, resp.NetbirdConfig.Turns, "Turns must be empty when not configured") @@ -149,7 +149,7 @@ func TestBuildFastPathResponse_ExtraSettingsErrorStillReturnsResponse(t *testing settingsMock := settings.NewMockManager(ctrl) settingsMock.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(nil, assertAnError).AnyTimes() - resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock, noGroupsFetcher, fastPathTestPeer()) + resp := buildFastPathResponse(context.Background(), cfg, secrets, settingsMock.GetExtraSettings, noGroupsFetcher, fastPathTestPeer()) require.NotNil(t, resp, "extra settings failure must degrade gracefully into an empty fast-path response") assert.Nil(t, resp.NetworkMap, "NetworkMap still omitted on degraded path") } diff --git a/management/server/management_proto_test.go b/management/server/management_proto_test.go index 48147fc06..bc57c6897 100644 --- a/management/server/management_proto_test.go +++ b/management/server/management_proto_test.go @@ -393,7 +393,7 @@ func startManagementForTest(t *testing.T, testFile string, config *config.Config peerSerialCache := nbgrpc.NewPeerSerialCache(ctx, cacheStore, time.Minute) fastPathFlag := nbgrpc.NewFastPathFlag(true) - mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, MockIntegratedValidator{}, networkMapController, nil, peerSerialCache, fastPathFlag) + mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, MockIntegratedValidator{}, networkMapController, nil, peerSerialCache, fastPathFlag, cacheStore) if err != nil { return nil, nil, "", cleanup, err } diff --git a/management/server/management_test.go b/management/server/management_test.go index 6396de6b6..3eea1b5a5 100644 --- a/management/server/management_test.go +++ b/management/server/management_test.go @@ -258,6 +258,7 @@ func startServer( nil, nil, nil, + nil, ) if err != nil { t.Fatalf("failed creating management server: %v", err) diff --git a/shared/management/client/client_test.go b/shared/management/client/client_test.go index bc566a058..7e088c003 100644 --- a/shared/management/client/client_test.go +++ b/shared/management/client/client_test.go @@ -138,7 +138,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) { if err != nil { t.Fatal(err) } - mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, mgmt.MockIntegratedValidator{}, networkMapController, nil, nil, nil) + mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, mgmt.MockIntegratedValidator{}, networkMapController, nil, nil, nil, nil) if err != nil { t.Fatal(err) }