diff --git a/management/internals/shared/fastpathcache/invalidator.go b/management/internals/shared/fastpathcache/invalidator.go new file mode 100644 index 000000000..fde38eb7d --- /dev/null +++ b/management/internals/shared/fastpathcache/invalidator.go @@ -0,0 +1,48 @@ +// Package fastpathcache exposes the key prefixes and delete helpers for the +// Sync fast-path caches so mutation sites outside the gRPC server package +// can invalidate stale entries without a circular import on the grpc +// package that owns the read-side cache wrappers. +package fastpathcache + +import ( + "context" + + "github.com/eko/gocache/lib/v4/cache" + cachestore "github.com/eko/gocache/lib/v4/store" + log "github.com/sirupsen/logrus" +) + +const ( + // ExtraSettingsKeyPrefix matches the prefix used by the read-side + // extraSettingsCache in management/internals/shared/grpc. Keep these in + // sync; drift would leak stale reads on mutations. + ExtraSettingsKeyPrefix = "extra-settings:" + + // PeerGroupsKeyPrefix matches the prefix used by the read-side + // peerGroupsCache in management/internals/shared/grpc. + PeerGroupsKeyPrefix = "peer-groups:" +) + +// InvalidateExtraSettings removes the cached ExtraSettings entry for the +// given account from the shared cache store. Safe to call with a nil store +// and safe to call when no entry exists. Errors are swallowed at debug level +// so mutation flows never fail because of a cache hiccup. +func InvalidateExtraSettings(ctx context.Context, store cachestore.StoreInterface, accountID string) { + if store == nil { + return + } + if err := cache.New[string](store).Delete(ctx, ExtraSettingsKeyPrefix+accountID); err != nil { + log.WithContext(ctx).Debugf("fastpathcache: invalidate extra settings for %s: %v", accountID, err) + } +} + +// InvalidatePeerGroups removes the cached peer-groups entry for a peer. Safe +// to call with a nil store and safe to call when no entry exists. +func InvalidatePeerGroups(ctx context.Context, store cachestore.StoreInterface, peerID string) { + if store == nil { + return + } + if err := cache.New[string](store).Delete(ctx, PeerGroupsKeyPrefix+peerID); err != nil { + log.WithContext(ctx).Debugf("fastpathcache: invalidate peer groups for %s: %v", peerID, err) + } +} diff --git a/management/internals/shared/grpc/fast_path_caches.go b/management/internals/shared/grpc/fast_path_caches.go index 560c4eb2b..a0370f033 100644 --- a/management/internals/shared/grpc/fast_path_caches.go +++ b/management/internals/shared/grpc/fast_path_caches.go @@ -9,12 +9,13 @@ import ( "github.com/eko/gocache/lib/v4/store" log "github.com/sirupsen/logrus" + "github.com/netbirdio/netbird/management/internals/shared/fastpathcache" nbtypes "github.com/netbirdio/netbird/management/server/types" ) const ( - extraSettingsCacheKeyPrefix = "extra-settings:" - peerGroupsCacheKeyPrefix = "peer-groups:" + extraSettingsCacheKeyPrefix = fastpathcache.ExtraSettingsKeyPrefix + peerGroupsCacheKeyPrefix = fastpathcache.PeerGroupsKeyPrefix // DefaultExtraSettingsCacheTTL bounds how long a cached ExtraSettings // blob survives. Settings rarely change; a ~30s window is cheap and diff --git a/management/internals/shared/grpc/sync_fast_path.go b/management/internals/shared/grpc/sync_fast_path.go index e630cc29f..fbda1ec78 100644 --- a/management/internals/shared/grpc/sync_fast_path.go +++ b/management/internals/shared/grpc/sync_fast_path.go @@ -109,6 +109,38 @@ func (s *Server) lookupPeerAuthFromCache(peerPubKey string, incomingMetaHash uin // - the cached serial matches the current account serial // - the cached meta hash matches the incoming meta hash // - the cached serial is non-zero (guard against uninitialised entries) +// +// recordFastPathSkip emits a skip log and bumps the slow-path sync counter +// with a reason label. Used from every early-return site in tryFastPathSync +// so the fast-path hit-rate histogram in Grafana breaks down the misses by +// cause. +func (s *Server) recordFastPathSkip(ctx context.Context, reason string) { + log.WithContext(ctx).Debugf("fast path: skipped (reason=%s)", reason) + if s.appMetrics != nil { + s.appMetrics.GRPCMetrics().CountSlowPathSync(reason) + } +} + +// fastPathSkipReason returns a short reason tag when the eligibility check +// fails, or "" when the fast path should run. Mirrors shouldSkipNetworkMap's +// logic but attributes each individual failure condition so callers can log +// a histogram of fast-path misses. +func fastPathSkipReason(hit bool, cached peerSyncEntry, currentSerial, incomingMetaHash uint64) string { + if !hit { + return "cache_miss" + } + if cached.Serial == 0 { + return "cached_serial_zero" + } + if cached.Serial != currentSerial { + return "serial_mismatch" + } + if cached.MetaHash != incomingMetaHash { + return "meta_mismatch" + } + return "" +} + func shouldSkipNetworkMap(goOS string, hit bool, cached peerSyncEntry, currentSerial, incomingMetaHash uint64) bool { if strings.EqualFold(goOS, "android") { return false @@ -230,27 +262,32 @@ func (s *Server) tryFastPathSync( unlock *func(), ) (took bool, err error) { if s.peerSerialCache == nil { + s.recordFastPathSkip(ctx, "cache_disabled") return false, nil } if !s.fastPathFlag.Enabled() { + s.recordFastPathSkip(ctx, "flag_off") return false, nil } if strings.EqualFold(peerMeta.GoOS, "android") { + s.recordFastPathSkip(ctx, "android") return false, nil } networkStart := time.Now() - network, err := s.accountManager.GetStore().GetAccountNetwork(ctx, store.LockingStrengthNone, accountID) + currentSerial, err := s.accountManager.GetStore().GetAccountNetworkSerial(ctx, store.LockingStrengthNone, accountID) if err != nil { - log.WithContext(ctx).Debugf("fast path: lookup account network: %v", err) + log.WithContext(ctx).Debugf("fast path: account network serial lookup error: %v", err) + s.recordFastPathSkip(ctx, "account_network_error") return false, nil } - log.WithContext(ctx).Debugf("fast path: initial GetAccountNetwork took %s", time.Since(networkStart)) + log.WithContext(ctx).Debugf("fast path: initial GetAccountNetworkSerial took %s", time.Since(networkStart)) eligibilityStart := time.Now() cached, hit := s.peerSerialCache.Get(peerKey.String()) - if !shouldSkipNetworkMap(peerMeta.GoOS, hit, cached, network.CurrentSerial(), peerMetaHash) { - log.WithContext(ctx).Debugf("fast path: eligibility check (miss) took %s", time.Since(eligibilityStart)) + if reason := fastPathSkipReason(hit, cached, currentSerial, peerMetaHash); reason != "" { + log.WithContext(ctx).Debugf("fast path: eligibility check took %s", time.Since(eligibilityStart)) + s.recordFastPathSkip(ctx, reason) return false, nil } log.WithContext(ctx).Debugf("fast path: eligibility check (hit) took %s", time.Since(eligibilityStart)) @@ -261,6 +298,7 @@ func (s *Server) tryFastPathSync( } peer, updates, committed := s.commitFastPath(ctx, accountID, peerKey, realIP, syncStart, cachedPeer) if !committed { + s.recordFastPathSkip(ctx, "commit_failed") return false, nil } @@ -397,6 +435,7 @@ func (s *Server) runFastPathSync( if s.appMetrics != nil { s.appMetrics.GRPCMetrics().CountSyncRequestDuration(time.Since(reqStart), accountID) + s.appMetrics.GRPCMetrics().CountFastPathSync() } log.WithContext(ctx).Debugf("Sync (fast path) took %s", time.Since(reqStart)) diff --git a/management/server/account.go b/management/server/account.go index f7a8d54e9..2812ddb13 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -30,6 +30,7 @@ import ( "github.com/netbirdio/netbird/formatter/hook" "github.com/netbirdio/netbird/management/internals/controllers/network_map" nbconfig "github.com/netbirdio/netbird/management/internals/server/config" + "github.com/netbirdio/netbird/management/internals/shared/fastpathcache" "github.com/netbirdio/netbird/management/server/account" "github.com/netbirdio/netbird/management/server/activity" nbcache "github.com/netbirdio/netbird/management/server/cache" @@ -111,6 +112,11 @@ type DefaultAccountManager struct { permissionsManager permissions.Manager disableDefaultPolicy bool + + // sharedCacheStore is retained so mutation paths can invalidate the + // Sync fast-path caches (ExtraSettings, peer-groups) without a circular + // dependency on the gRPC server package that owns the read-side wrappers. + sharedCacheStore cacheStore.StoreInterface } var _ account.Manager = (*DefaultAccountManager)(nil) @@ -250,6 +256,7 @@ func BuildManager( am.externalCacheManager = nbcache.NewUserDataCache(sharedCacheStore) am.cacheManager = nbcache.NewAccountUserDataCache(am.loadAccount, sharedCacheStore) + am.sharedCacheStore = sharedCacheStore if !isNil(am.idpManager) && !IsEmbeddedIdp(am.idpManager) { go func() { @@ -368,6 +375,9 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco if err != nil { return nil, err } + if extraSettingsChanged { + fastpathcache.InvalidateExtraSettings(ctx, am.sharedCacheStore, accountID) + } am.handleRoutingPeerDNSResolutionSettings(ctx, oldSettings, newSettings, userID, accountID) am.handleLazyConnectionSettings(ctx, oldSettings, newSettings, userID, accountID) diff --git a/management/server/store/sql_store.go b/management/server/store/sql_store.go index d5c398c37..dd6540e27 100644 --- a/management/server/store/sql_store.go +++ b/management/server/store/sql_store.go @@ -2658,6 +2658,30 @@ func (s *SqlStore) GetAccountNetwork(ctx context.Context, lockStrength LockingSt return accountNetwork.Network, nil } +// GetAccountNetworkSerial returns only the network.serial column for an +// account, avoiding the overhead of materialising the full Network struct +// (which carries a JSON-serialised CIDR and other columns). Used by the Sync +// fast path to check whether the peer's cached serial still matches without +// paying the full-row read cost on contended DBs. +func (s *SqlStore) GetAccountNetworkSerial(ctx context.Context, lockStrength LockingStrength, accountID string) (uint64, error) { + tx := s.db + if lockStrength != LockingStrengthNone { + tx = tx.Clauses(clause.Locking{Strength: string(lockStrength)}) + } + + var serial uint64 + if err := tx.Model(&types.Account{}). + Select("network_serial"). + Where(idQueryCondition, accountID). + Take(&serial).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return 0, status.NewAccountNotFoundError(accountID) + } + return 0, status.Errorf(status.Internal, "issue getting network serial from store: %s", err) + } + return serial, nil +} + func (s *SqlStore) GetPeerByPeerPubKey(ctx context.Context, lockStrength LockingStrength, peerKey string) (*nbpeer.Peer, error) { tx := s.db if lockStrength != LockingStrengthNone { diff --git a/management/server/store/store.go b/management/server/store/store.go index 83b7c6286..861d6714a 100644 --- a/management/server/store/store.go +++ b/management/server/store/store.go @@ -188,6 +188,10 @@ type Store interface { GetTakenIPs(ctx context.Context, lockStrength LockingStrength, accountId string) ([]net.IP, error) IncrementNetworkSerial(ctx context.Context, accountId string) error GetAccountNetwork(ctx context.Context, lockStrength LockingStrength, accountId string) (*types.Network, error) + // GetAccountNetworkSerial returns only the network.serial column; used by + // the Sync fast path to skip the full-row GetAccountNetwork read when all + // we need is the serial. + GetAccountNetworkSerial(ctx context.Context, lockStrength LockingStrength, accountId string) (uint64, error) GetInstallationID() string SaveInstallationID(ctx context.Context, ID string) error diff --git a/management/server/store/store_mock.go b/management/server/store/store_mock.go index 2e80394de..048d57518 100644 --- a/management/server/store/store_mock.go +++ b/management/server/store/store_mock.go @@ -1032,6 +1032,21 @@ func (mr *MockStoreMockRecorder) GetAccountNetwork(ctx, lockStrength, accountId return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAccountNetwork", reflect.TypeOf((*MockStore)(nil).GetAccountNetwork), ctx, lockStrength, accountId) } +// GetAccountNetworkSerial mocks base method. +func (m *MockStore) GetAccountNetworkSerial(ctx context.Context, lockStrength LockingStrength, accountId string) (uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAccountNetworkSerial", ctx, lockStrength, accountId) + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAccountNetworkSerial indicates an expected call of GetAccountNetworkSerial. +func (mr *MockStoreMockRecorder) GetAccountNetworkSerial(ctx, lockStrength, accountId interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAccountNetworkSerial", reflect.TypeOf((*MockStore)(nil).GetAccountNetworkSerial), ctx, lockStrength, accountId) +} + // GetAccountNetworks mocks base method. func (m *MockStore) GetAccountNetworks(ctx context.Context, lockStrength LockingStrength, accountID string) ([]*types1.Network, error) { m.ctrl.T.Helper() diff --git a/management/server/telemetry/grpc_metrics.go b/management/server/telemetry/grpc_metrics.go index d3239c57a..7092a97f1 100644 --- a/management/server/telemetry/grpc_metrics.go +++ b/management/server/telemetry/grpc_metrics.go @@ -16,6 +16,7 @@ type GRPCMetrics struct { meter metric.Meter syncRequestsCounter metric.Int64Counter syncRequestsBlockedCounter metric.Int64Counter + syncPathCounter metric.Int64Counter loginRequestsCounter metric.Int64Counter loginRequestsBlockedCounter metric.Int64Counter loginRequestHighLatencyCounter metric.Int64Counter @@ -51,6 +52,14 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro return nil, err } + syncPathCounter, err := meter.Int64Counter("management.grpc.sync.path.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of sync requests by the path taken (fast vs slow). Slow-path rows carry a reason label for fast-path misses."), + ) + if err != nil { + return nil, err + } + loginRequestsCounter, err := meter.Int64Counter("management.grpc.login.request.counter", metric.WithUnit("1"), metric.WithDescription("Number of login gRPC requests from the peers to authenticate and receive initial configuration and relay credentials"), @@ -142,6 +151,7 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro meter: meter, syncRequestsCounter: syncRequestsCounter, syncRequestsBlockedCounter: syncRequestsBlockedCounter, + syncPathCounter: syncPathCounter, loginRequestsCounter: loginRequestsCounter, loginRequestsBlockedCounter: loginRequestsBlockedCounter, loginRequestHighLatencyCounter: loginRequestHighLatencyCounter, @@ -173,6 +183,26 @@ func (grpcMetrics *GRPCMetrics) CountSyncRequestBlocked() { grpcMetrics.syncRequestsBlockedCounter.Add(grpcMetrics.ctx, 1) } +// CountFastPathSync increments the sync-path counter for a Sync that took the +// fast path. Used together with CountSlowPathSync to graph the fast-path hit +// rate and, via the reason label on the slow-path counts, see where the +// misses go (android / cache_miss / serial_mismatch / meta_mismatch / ...). +func (grpcMetrics *GRPCMetrics) CountFastPathSync() { + grpcMetrics.syncPathCounter.Add(grpcMetrics.ctx, 1, metric.WithAttributes(attribute.String("path", "fast"))) +} + +// CountSlowPathSync increments the sync-path counter for a Sync that fell +// through to the slow path. reason is a short tag describing why the fast +// path was skipped; pass "" if the reason is unknown or the Sync never had +// a chance to attempt the fast path. +func (grpcMetrics *GRPCMetrics) CountSlowPathSync(reason string) { + attrs := []attribute.KeyValue{attribute.String("path", "slow")} + if reason != "" { + attrs = append(attrs, attribute.String("reason", reason)) + } + grpcMetrics.syncPathCounter.Add(grpcMetrics.ctx, 1, metric.WithAttributes(attrs...)) +} + // CountGetKeyRequest counts the number of gRPC get server key requests coming to the gRPC API func (grpcMetrics *GRPCMetrics) CountGetKeyRequest() { grpcMetrics.getKeyRequestsCounter.Add(grpcMetrics.ctx, 1)