From 06578127fd35b2c086ecfaea6eff4706ece02e18 Mon Sep 17 00:00:00 2001 From: mlsmaycon Date: Tue, 14 Apr 2026 19:57:31 +0200 Subject: [PATCH] Centralize cache store creation to reuse a single Redis connection pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each cache consumer (IDP cache, token store, PKCE store, secrets manager, EDR validator) was independently calling NewStore, creating separate Redis clients with their own connection pools — up to 1400 potential connections from a single management server process. Introduce a shared CacheStore() singleton on BaseServer that creates one store at boot and injects it into all consumers. Consumer constructors now receive a store.StoreInterface instead of creating their own. For Redis mode, all consumers share one connection pool (1000 max conns). For in-memory mode, all consumers share one GoCache instance. --- client/cmd/testutil_test.go | 16 ++++-- client/internal/engine_test.go | 10 +++- client/server/server_test.go | 10 +++- .../service/manager/manager_test.go | 27 +++++----- management/internals/server/boot.go | 25 +++++---- management/internals/server/controllers.go | 3 +- management/internals/server/modules.go | 2 +- .../internals/shared/grpc/onetime_token.go | 13 ++--- .../internals/shared/grpc/pkce_verifier.go | 13 ++--- .../internals/shared/grpc/proxy_test.go | 52 ++++++++----------- .../shared/grpc/validate_session_test.go | 23 +++++--- management/server/account.go | 13 ++--- management/server/account_test.go | 7 ++- management/server/cache/store.go | 9 ++++ management/server/dns_test.go | 9 +++- .../proxy/auth_callback_integration_test.go | 7 +-- .../testing/testing_tools/channel/channel.go | 37 ++++++------- management/server/identity_provider_test.go | 9 +++- management/server/management_proto_test.go | 9 +++- management/server/management_test.go | 10 +++- management/server/nameserver_test.go | 9 +++- management/server/peer_test.go | 25 +++++++-- management/server/route_test.go | 9 +++- proxy/management_integration_test.go | 7 +-- shared/management/client/client_test.go | 15 ++++-- 25 files changed, 235 insertions(+), 134 deletions(-) diff --git a/client/cmd/testutil_test.go b/client/cmd/testutil_test.go index 4bda33e65..eed96c3a8 100644 --- a/client/cmd/testutil_test.go +++ b/client/cmd/testutil_test.go @@ -13,6 +13,8 @@ import ( "github.com/netbirdio/management-integrations/integrations" + nbcache "github.com/netbirdio/netbird/management/server/cache" + "github.com/netbirdio/netbird/management/internals/controllers/network_map/controller" "github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel" "github.com/netbirdio/netbird/management/internals/modules/peers" @@ -100,9 +102,16 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp jobManager := job.NewJobManager(nil, store, peersmanager) - iv, _ := integrations.NewIntegratedValidator(context.Background(), peersmanager, settingsManagerMock, eventStore) + ctx := context.Background() - metrics, err := telemetry.NewDefaultAppMetrics(context.Background()) + cacheStore, err := nbcache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + if err != nil { + t.Fatal(err) + } + + iv, _ := integrations.NewIntegratedValidator(ctx, peersmanager, settingsManagerMock, eventStore, cacheStore) + + metrics, err := telemetry.NewDefaultAppMetrics(ctx) require.NoError(t, err) settingsMockManager := settings.NewMockManager(ctrl) @@ -113,12 +122,11 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp Return(&types.Settings{}, nil). AnyTimes() - ctx := context.Background() updateManager := update_channel.NewPeersUpdateManager(metrics) requestBuffer := mgmt.NewAccountRequestBuffer(ctx, store) networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, mgmt.MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock(), manager.NewEphemeralManager(store, peersmanager), config) - accountManager, err := mgmt.BuildManager(context.Background(), config, store, networkMapController, jobManager, nil, "", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false) + accountManager, err := mgmt.BuildManager(ctx, config, store, networkMapController, jobManager, nil, "", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false, cacheStore) if err != nil { t.Fatal(err) } diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index 1f6fe384a..9fa4e51b2 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -55,6 +55,7 @@ import ( nbdns "github.com/netbirdio/netbird/dns" "github.com/netbirdio/netbird/management/server" "github.com/netbirdio/netbird/management/server/activity" + nbcache "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/management/server/integrations/port_forwarding" "github.com/netbirdio/netbird/management/server/permissions" "github.com/netbirdio/netbird/management/server/settings" @@ -1634,7 +1635,12 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri peersManager := peers.NewManager(store, permissionsManager) jobManager := job.NewJobManager(nil, store, peersManager) - ia, _ := integrations.NewIntegratedValidator(context.Background(), peersManager, nil, eventStore) + cacheStore, err := nbcache.NewStore(context.Background(), 100*time.Millisecond, 300*time.Millisecond, 100) + if err != nil { + return nil, "", err + } + + ia, _ := integrations.NewIntegratedValidator(context.Background(), peersManager, nil, eventStore, cacheStore) metrics, err := telemetry.NewDefaultAppMetrics(context.Background()) require.NoError(t, err) @@ -1656,7 +1662,7 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri updateManager := update_channel.NewPeersUpdateManager(metrics) requestBuffer := server.NewAccountRequestBuffer(context.Background(), store) networkMapController := controller.NewController(context.Background(), store, metrics, updateManager, requestBuffer, server.MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock(), manager.NewEphemeralManager(store, peersManager), config) - accountManager, err := server.BuildManager(context.Background(), config, store, networkMapController, jobManager, nil, "", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) + accountManager, err := server.BuildManager(context.Background(), config, store, networkMapController, jobManager, nil, "", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false, cacheStore) if err != nil { return nil, "", err } diff --git a/client/server/server_test.go b/client/server/server_test.go index 6de23d501..437c4bbc8 100644 --- a/client/server/server_test.go +++ b/client/server/server_test.go @@ -36,6 +36,7 @@ import ( daemonProto "github.com/netbirdio/netbird/client/proto" "github.com/netbirdio/netbird/management/server" "github.com/netbirdio/netbird/management/server/activity" + nbcache "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/management/server/integrations/port_forwarding" "github.com/netbirdio/netbird/management/server/permissions" "github.com/netbirdio/netbird/management/server/settings" @@ -309,7 +310,12 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve jobManager := job.NewJobManager(nil, store, peersManager) - ia, _ := integrations.NewIntegratedValidator(context.Background(), peersManager, settingsManagerMock, eventStore) + cacheStore, err := nbcache.NewStore(context.Background(), 100*time.Millisecond, 300*time.Millisecond, 100) + if err != nil { + return nil, "", err + } + + ia, _ := integrations.NewIntegratedValidator(context.Background(), peersManager, settingsManagerMock, eventStore, cacheStore) metrics, err := telemetry.NewDefaultAppMetrics(context.Background()) require.NoError(t, err) @@ -320,7 +326,7 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve requestBuffer := server.NewAccountRequestBuffer(context.Background(), store) peersUpdateManager := update_channel.NewPeersUpdateManager(metrics) networkMapController := controller.NewController(context.Background(), store, metrics, peersUpdateManager, requestBuffer, server.MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock(), manager.NewEphemeralManager(store, peersManager), config) - accountManager, err := server.BuildManager(context.Background(), config, store, networkMapController, jobManager, nil, "", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false) + accountManager, err := server.BuildManager(context.Background(), config, store, networkMapController, jobManager, nil, "", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false, cacheStore) if err != nil { return nil, "", err } diff --git a/management/internals/modules/reverseproxy/service/manager/manager_test.go b/management/internals/modules/reverseproxy/service/manager/manager_test.go index 69d48f10a..54ac8ab18 100644 --- a/management/internals/modules/reverseproxy/service/manager/manager_test.go +++ b/management/internals/modules/reverseproxy/service/manager/manager_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + cachestore "github.com/eko/gocache/lib/v4/store" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -18,6 +19,7 @@ import ( nbgrpc "github.com/netbirdio/netbird/management/internals/shared/grpc" "github.com/netbirdio/netbird/management/server/account" "github.com/netbirdio/netbird/management/server/activity" + nbcache "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/management/server/mock_server" resourcetypes "github.com/netbirdio/netbird/management/server/networks/resources/types" nbpeer "github.com/netbirdio/netbird/management/server/peer" @@ -29,6 +31,13 @@ import ( "github.com/netbirdio/netbird/shared/management/status" ) +func testCacheStore(t *testing.T) cachestore.StoreInterface { + t.Helper() + s, err := nbcache.NewStore(context.Background(), 30*time.Minute, 10*time.Minute, 100) + require.NoError(t, err) + return s +} + func TestInitializeServiceForCreate(t *testing.T) { ctx := context.Background() accountID := "test-account" @@ -422,10 +431,8 @@ func TestDeletePeerService_SourcePeerValidation(t *testing.T) { newProxyServer := func(t *testing.T) *nbgrpc.ProxyServiceServer { t.Helper() - tokenStore, err := nbgrpc.NewOneTimeTokenStore(context.Background(), 1*time.Hour, 10*time.Minute, 100) - require.NoError(t, err) - pkceStore, err := nbgrpc.NewPKCEVerifierStore(context.Background(), 10*time.Minute, 10*time.Minute, 100) - require.NoError(t, err) + tokenStore := nbgrpc.NewOneTimeTokenStore(context.Background(), testCacheStore(t)) + pkceStore := nbgrpc.NewPKCEVerifierStore(context.Background(), testCacheStore(t)) srv := nbgrpc.NewProxyServiceServer(nil, tokenStore, pkceStore, nbgrpc.ProxyOIDCConfig{}, nil, nil, nil) return srv } @@ -703,10 +710,8 @@ func setupIntegrationTest(t *testing.T) (*Manager, store.Store) { }, } - tokenStore, err := nbgrpc.NewOneTimeTokenStore(ctx, 1*time.Hour, 10*time.Minute, 100) - require.NoError(t, err) - pkceStore, err := nbgrpc.NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100) - require.NoError(t, err) + tokenStore := nbgrpc.NewOneTimeTokenStore(ctx, testCacheStore(t)) + pkceStore := nbgrpc.NewPKCEVerifierStore(ctx, testCacheStore(t)) proxySrv := nbgrpc.NewProxyServiceServer(nil, tokenStore, pkceStore, nbgrpc.ProxyOIDCConfig{}, nil, nil, nil) proxyController, err := proxymanager.NewGRPCController(proxySrv, noop.NewMeterProvider().Meter("")) @@ -1128,10 +1133,8 @@ func TestDeleteService_DeletesTargets(t *testing.T) { mockPerms := permissions.NewMockManager(ctrl) mockAcct := account.NewMockManager(ctrl) - tokenStore, err := nbgrpc.NewOneTimeTokenStore(ctx, 1*time.Hour, 10*time.Minute, 100) - require.NoError(t, err) - pkceStore, err := nbgrpc.NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100) - require.NoError(t, err) + tokenStore := nbgrpc.NewOneTimeTokenStore(ctx, testCacheStore(t)) + pkceStore := nbgrpc.NewPKCEVerifierStore(ctx, testCacheStore(t)) proxySrv := nbgrpc.NewProxyServiceServer(nil, tokenStore, pkceStore, nbgrpc.ProxyOIDCConfig{}, nil, nil, nil) proxyController, err := proxymanager.NewGRPCController(proxySrv, noop.NewMeterProvider().Meter("")) diff --git a/management/internals/server/boot.go b/management/internals/server/boot.go index 88d37ca80..24dfb641b 100644 --- a/management/internals/server/boot.go +++ b/management/internals/server/boot.go @@ -18,6 +18,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" + cachestore "github.com/eko/gocache/lib/v4/store" "github.com/netbirdio/management-integrations/integrations" "github.com/netbirdio/netbird/encryption" @@ -26,6 +27,7 @@ import ( accesslogsmanager "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/accesslogs/manager" nbgrpc "github.com/netbirdio/netbird/management/internals/shared/grpc" "github.com/netbirdio/netbird/management/server/activity" + nbcache "github.com/netbirdio/netbird/management/server/cache" nbContext "github.com/netbirdio/netbird/management/server/context" nbhttp "github.com/netbirdio/netbird/management/server/http" "github.com/netbirdio/netbird/management/server/store" @@ -58,6 +60,18 @@ func (s *BaseServer) Metrics() telemetry.AppMetrics { }) } +// CacheStore returns a shared cache store backed by Redis or in-memory depending on the environment. +// All consumers should reuse this store to avoid creating multiple Redis connections. +func (s *BaseServer) CacheStore() cachestore.StoreInterface { + return Create(s, func() cachestore.StoreInterface { + cs, err := nbcache.NewStore(context.Background(), nbcache.DefaultStoreMaxTimeout, nbcache.DefaultStoreCleanupInterval, nbcache.DefaultStoreMaxConn) + if err != nil { + log.Fatalf("failed to create shared cache store: %v", err) + } + return cs + }) +} + func (s *BaseServer) Store() store.Store { return Create(s, func() store.Store { store, err := store.NewStore(context.Background(), s.Config.StoreConfig.Engine, s.Config.Datadir, s.Metrics(), false) @@ -195,10 +209,7 @@ func (s *BaseServer) proxyOIDCConfig() nbgrpc.ProxyOIDCConfig { func (s *BaseServer) ProxyTokenStore() *nbgrpc.OneTimeTokenStore { return Create(s, func() *nbgrpc.OneTimeTokenStore { - tokenStore, err := nbgrpc.NewOneTimeTokenStore(context.Background(), 5*time.Minute, 10*time.Minute, 100) - if err != nil { - log.Fatalf("failed to create proxy token store: %v", err) - } + tokenStore := nbgrpc.NewOneTimeTokenStore(context.Background(), s.CacheStore()) log.Info("One-time token store initialized for proxy authentication") return tokenStore }) @@ -206,11 +217,7 @@ func (s *BaseServer) ProxyTokenStore() *nbgrpc.OneTimeTokenStore { func (s *BaseServer) PKCEVerifierStore() *nbgrpc.PKCEVerifierStore { return Create(s, func() *nbgrpc.PKCEVerifierStore { - pkceStore, err := nbgrpc.NewPKCEVerifierStore(context.Background(), 10*time.Minute, 10*time.Minute, 100) - if err != nil { - log.Fatalf("failed to create PKCE verifier store: %v", err) - } - return pkceStore + return nbgrpc.NewPKCEVerifierStore(context.Background(), s.CacheStore()) }) } diff --git a/management/internals/server/controllers.go b/management/internals/server/controllers.go index c7eab3d19..9a8e45d33 100644 --- a/management/internals/server/controllers.go +++ b/management/internals/server/controllers.go @@ -41,7 +41,8 @@ func (s *BaseServer) IntegratedValidator() integrated_validator.IntegratedValida context.Background(), s.PeersManager(), s.SettingsManager(), - s.EventStore()) + s.EventStore(), + s.CacheStore()) if err != nil { log.Errorf("failed to create integrated peer validator: %v", err) } diff --git a/management/internals/server/modules.go b/management/internals/server/modules.go index 374ea5c81..9b2ec2989 100644 --- a/management/internals/server/modules.go +++ b/management/internals/server/modules.go @@ -100,7 +100,7 @@ func (s *BaseServer) PeersManager() peers.Manager { func (s *BaseServer) AccountManager() account.Manager { return Create(s, func() account.Manager { - accountManager, err := server.BuildManager(context.Background(), s.Config, s.Store(), s.NetworkMapController(), s.JobManager(), s.IdpManager(), s.mgmtSingleAccModeDomain, s.EventStore(), s.GeoLocationManager(), s.userDeleteFromIDPEnabled, s.IntegratedValidator(), s.Metrics(), s.ProxyController(), s.SettingsManager(), s.PermissionsManager(), s.Config.DisableDefaultPolicy) + accountManager, err := server.BuildManager(context.Background(), s.Config, s.Store(), s.NetworkMapController(), s.JobManager(), s.IdpManager(), s.mgmtSingleAccModeDomain, s.EventStore(), s.GeoLocationManager(), s.userDeleteFromIDPEnabled, s.IntegratedValidator(), s.Metrics(), s.ProxyController(), s.SettingsManager(), s.PermissionsManager(), s.Config.DisableDefaultPolicy, s.CacheStore()) if err != nil { log.Fatalf("failed to create account service: %v", err) } diff --git a/management/internals/shared/grpc/onetime_token.go b/management/internals/shared/grpc/onetime_token.go index 7999407db..acfd6eafb 100644 --- a/management/internals/shared/grpc/onetime_token.go +++ b/management/internals/shared/grpc/onetime_token.go @@ -14,8 +14,6 @@ import ( "github.com/eko/gocache/lib/v4/cache" "github.com/eko/gocache/lib/v4/store" log "github.com/sirupsen/logrus" - - nbcache "github.com/netbirdio/netbird/management/server/cache" ) type tokenMetadata struct { @@ -32,17 +30,12 @@ type OneTimeTokenStore struct { ctx context.Context } -// NewOneTimeTokenStore creates a token store with automatic backend selection -func NewOneTimeTokenStore(ctx context.Context, maxTimeout, cleanupInterval time.Duration, maxConn int) (*OneTimeTokenStore, error) { - cacheStore, err := nbcache.NewStore(ctx, maxTimeout, cleanupInterval, maxConn) - if err != nil { - return nil, fmt.Errorf("failed to create cache store: %w", err) - } - +// NewOneTimeTokenStore creates a token store using the provided shared cache store. +func NewOneTimeTokenStore(ctx context.Context, cacheStore store.StoreInterface) *OneTimeTokenStore { return &OneTimeTokenStore{ cache: cache.New[string](cacheStore), ctx: ctx, - }, nil + } } // GenerateToken creates a new cryptographically secure one-time token diff --git a/management/internals/shared/grpc/pkce_verifier.go b/management/internals/shared/grpc/pkce_verifier.go index 441e8b051..a1325256c 100644 --- a/management/internals/shared/grpc/pkce_verifier.go +++ b/management/internals/shared/grpc/pkce_verifier.go @@ -8,8 +8,6 @@ import ( "github.com/eko/gocache/lib/v4/cache" "github.com/eko/gocache/lib/v4/store" log "github.com/sirupsen/logrus" - - nbcache "github.com/netbirdio/netbird/management/server/cache" ) // PKCEVerifierStore manages PKCE verifiers for OAuth flows. @@ -19,17 +17,12 @@ type PKCEVerifierStore struct { ctx context.Context } -// NewPKCEVerifierStore creates a PKCE verifier store with automatic backend selection -func NewPKCEVerifierStore(ctx context.Context, maxTimeout, cleanupInterval time.Duration, maxConn int) (*PKCEVerifierStore, error) { - cacheStore, err := nbcache.NewStore(ctx, maxTimeout, cleanupInterval, maxConn) - if err != nil { - return nil, fmt.Errorf("failed to create cache store: %w", err) - } - +// NewPKCEVerifierStore creates a PKCE verifier store using the provided shared cache store. +func NewPKCEVerifierStore(ctx context.Context, cacheStore store.StoreInterface) *PKCEVerifierStore { return &PKCEVerifierStore{ cache: cache.New[string](cacheStore), ctx: ctx, - }, nil + } } // Store saves a PKCE verifier associated with an OAuth state parameter. diff --git a/management/internals/shared/grpc/proxy_test.go b/management/internals/shared/grpc/proxy_test.go index d5aed3dee..de4e96d93 100644 --- a/management/internals/shared/grpc/proxy_test.go +++ b/management/internals/shared/grpc/proxy_test.go @@ -9,13 +9,22 @@ import ( "testing" "time" + cachestore "github.com/eko/gocache/lib/v4/store" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/proxy" + nbcache "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/shared/management/proto" ) +func testCacheStore(t *testing.T) cachestore.StoreInterface { + t.Helper() + s, err := nbcache.NewStore(context.Background(), 30*time.Minute, 10*time.Minute, 100) + require.NoError(t, err) + return s +} + type testProxyController struct { mu sync.Mutex clusterProxies map[string]map[string]struct{} @@ -114,11 +123,8 @@ func drainEmpty(ch chan *proto.GetMappingUpdateResponse) bool { func TestSendServiceUpdateToCluster_UniqueTokensPerProxy(t *testing.T) { ctx := context.Background() - tokenStore, err := NewOneTimeTokenStore(ctx, time.Hour, 10*time.Minute, 100) - require.NoError(t, err) - - pkceStore, err := NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100) - require.NoError(t, err) + tokenStore := NewOneTimeTokenStore(ctx, testCacheStore(t)) + pkceStore := NewPKCEVerifierStore(ctx, testCacheStore(t)) s := &ProxyServiceServer{ tokenStore: tokenStore, @@ -174,11 +180,8 @@ func TestSendServiceUpdateToCluster_UniqueTokensPerProxy(t *testing.T) { func TestSendServiceUpdateToCluster_DeleteNoToken(t *testing.T) { ctx := context.Background() - tokenStore, err := NewOneTimeTokenStore(ctx, time.Hour, 10*time.Minute, 100) - require.NoError(t, err) - - pkceStore, err := NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100) - require.NoError(t, err) + tokenStore := NewOneTimeTokenStore(ctx, testCacheStore(t)) + pkceStore := NewPKCEVerifierStore(ctx, testCacheStore(t)) s := &ProxyServiceServer{ tokenStore: tokenStore, @@ -211,11 +214,8 @@ func TestSendServiceUpdateToCluster_DeleteNoToken(t *testing.T) { func TestSendServiceUpdate_UniqueTokensPerProxy(t *testing.T) { ctx := context.Background() - tokenStore, err := NewOneTimeTokenStore(ctx, time.Hour, 10*time.Minute, 100) - require.NoError(t, err) - - pkceStore, err := NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100) - require.NoError(t, err) + tokenStore := NewOneTimeTokenStore(ctx, testCacheStore(t)) + pkceStore := NewPKCEVerifierStore(ctx, testCacheStore(t)) s := &ProxyServiceServer{ tokenStore: tokenStore, @@ -267,8 +267,7 @@ func generateState(s *ProxyServiceServer, redirectURL string) string { func TestOAuthState_NeverTheSame(t *testing.T) { ctx := context.Background() - pkceStore, err := NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100) - require.NoError(t, err) + pkceStore := NewPKCEVerifierStore(ctx, testCacheStore(t)) s := &ProxyServiceServer{ oidcConfig: ProxyOIDCConfig{ @@ -296,8 +295,7 @@ func TestOAuthState_NeverTheSame(t *testing.T) { func TestValidateState_RejectsOldTwoPartFormat(t *testing.T) { ctx := context.Background() - pkceStore, err := NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100) - require.NoError(t, err) + pkceStore := NewPKCEVerifierStore(ctx, testCacheStore(t)) s := &ProxyServiceServer{ oidcConfig: ProxyOIDCConfig{ @@ -307,7 +305,7 @@ func TestValidateState_RejectsOldTwoPartFormat(t *testing.T) { } // Old format had only 2 parts: base64(url)|hmac - err = s.pkceVerifierStore.Store("base64url|hmac", "test", 10*time.Minute) + err := s.pkceVerifierStore.Store("base64url|hmac", "test", 10*time.Minute) require.NoError(t, err) _, _, err = s.ValidateState("base64url|hmac") @@ -317,8 +315,7 @@ func TestValidateState_RejectsOldTwoPartFormat(t *testing.T) { func TestValidateState_RejectsInvalidHMAC(t *testing.T) { ctx := context.Background() - pkceStore, err := NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100) - require.NoError(t, err) + pkceStore := NewPKCEVerifierStore(ctx, testCacheStore(t)) s := &ProxyServiceServer{ oidcConfig: ProxyOIDCConfig{ @@ -328,7 +325,7 @@ func TestValidateState_RejectsInvalidHMAC(t *testing.T) { } // Store with tampered HMAC - err = s.pkceVerifierStore.Store("dGVzdA==|nonce|wrong-hmac", "test", 10*time.Minute) + err := s.pkceVerifierStore.Store("dGVzdA==|nonce|wrong-hmac", "test", 10*time.Minute) require.NoError(t, err) _, _, err = s.ValidateState("dGVzdA==|nonce|wrong-hmac") @@ -337,8 +334,7 @@ func TestValidateState_RejectsInvalidHMAC(t *testing.T) { } func TestSendServiceUpdateToCluster_FiltersOnCapability(t *testing.T) { - tokenStore, err := NewOneTimeTokenStore(context.Background(), time.Hour, 10*time.Minute, 100) - require.NoError(t, err) + tokenStore := NewOneTimeTokenStore(context.Background(), testCacheStore(t)) s := &ProxyServiceServer{ tokenStore: tokenStore, @@ -410,8 +406,7 @@ func TestSendServiceUpdateToCluster_FiltersOnCapability(t *testing.T) { } func TestSendServiceUpdateToCluster_TLSNotFiltered(t *testing.T) { - tokenStore, err := NewOneTimeTokenStore(context.Background(), time.Hour, 10*time.Minute, 100) - require.NoError(t, err) + tokenStore := NewOneTimeTokenStore(context.Background(), testCacheStore(t)) s := &ProxyServiceServer{ tokenStore: tokenStore, @@ -442,8 +437,7 @@ func TestSendServiceUpdateToCluster_TLSNotFiltered(t *testing.T) { // scenario for an existing service, verifying the correct update types // reach the correct clusters. func TestServiceModifyNotifications(t *testing.T) { - tokenStore, err := NewOneTimeTokenStore(context.Background(), time.Hour, 10*time.Minute, 100) - require.NoError(t, err) + tokenStore := NewOneTimeTokenStore(context.Background(), testCacheStore(t)) newServer := func() (*ProxyServiceServer, map[string]chan *proto.GetMappingUpdateResponse) { s := &ProxyServiceServer{ diff --git a/management/internals/shared/grpc/validate_session_test.go b/management/internals/shared/grpc/validate_session_test.go index 2f77de86e..d1d7fc8b7 100644 --- a/management/internals/shared/grpc/validate_session_test.go +++ b/management/internals/shared/grpc/validate_session_test.go @@ -39,11 +39,8 @@ func setupValidateSessionTest(t *testing.T) *validateSessionTestSetup { usersManager := &testValidateSessionUsersManager{store: testStore} proxyManager := &testValidateSessionProxyManager{} - tokenStore, err := NewOneTimeTokenStore(ctx, time.Minute, 10*time.Minute, 100) - require.NoError(t, err) - - pkceStore, err := NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100) - require.NoError(t, err) + tokenStore := NewOneTimeTokenStore(ctx, testCacheStore(t)) + pkceStore := NewPKCEVerifierStore(ctx, testCacheStore(t)) proxyService := NewProxyServiceServer(nil, tokenStore, pkceStore, ProxyOIDCConfig{}, nil, usersManager, proxyManager) proxyService.SetServiceManager(serviceManager) @@ -327,7 +324,7 @@ func (m *testValidateSessionServiceManager) GetActiveClusters(_ context.Context, type testValidateSessionProxyManager struct{} -func (m *testValidateSessionProxyManager) Connect(_ context.Context, _, _, _ string) error { +func (m *testValidateSessionProxyManager) Connect(_ context.Context, _, _, _ string, _ *proxy.Capabilities) error { return nil } @@ -335,7 +332,7 @@ func (m *testValidateSessionProxyManager) Disconnect(_ context.Context, _ string return nil } -func (m *testValidateSessionProxyManager) Heartbeat(_ context.Context, _ string) error { +func (m *testValidateSessionProxyManager) Heartbeat(_ context.Context, _, _, _ string) error { return nil } @@ -351,6 +348,18 @@ func (m *testValidateSessionProxyManager) CleanupStale(_ context.Context, _ time return nil } +func (m *testValidateSessionProxyManager) ClusterSupportsCustomPorts(_ context.Context, _ string) *bool { + return nil +} + +func (m *testValidateSessionProxyManager) ClusterRequireSubdomain(_ context.Context, _ string) *bool { + return nil +} + +func (m *testValidateSessionProxyManager) ClusterSupportsCrowdSec(_ context.Context, _ string) *bool { + return nil +} + type testValidateSessionUsersManager struct { store store.Store } diff --git a/management/server/account.go b/management/server/account.go index d90b46659..7d53cef03 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -181,7 +181,7 @@ func (am *DefaultAccountManager) getJWTGroupsChanges(user *types.User, groups [] return modified, newUserAutoGroups, newGroupsToCreate, nil } -// BuildManager creates a new DefaultAccountManager with a provided Store +// BuildManager creates a new DefaultAccountManager with all dependencies. func BuildManager( ctx context.Context, config *nbconfig.Config, @@ -199,6 +199,7 @@ func BuildManager( settingsManager settings.Manager, permissionsManager permissions.Manager, disableDefaultPolicy bool, + sharedCacheStore cacheStore.StoreInterface, ) (*DefaultAccountManager, error) { start := time.Now() defer func() { @@ -247,16 +248,12 @@ func BuildManager( log.WithContext(ctx).Infof("single account mode disabled, accounts number %d", accountsCounter) } - cacheStore, err := nbcache.NewStore(ctx, nbcache.DefaultIDPCacheExpirationMax, nbcache.DefaultIDPCacheCleanupInterval, nbcache.DefaultIDPCacheOpenConn) - if err != nil { - return nil, fmt.Errorf("getting cache store: %s", err) - } - am.externalCacheManager = nbcache.NewUserDataCache(cacheStore) - am.cacheManager = nbcache.NewAccountUserDataCache(am.loadAccount, cacheStore) + am.externalCacheManager = nbcache.NewUserDataCache(sharedCacheStore) + am.cacheManager = nbcache.NewAccountUserDataCache(am.loadAccount, sharedCacheStore) if !isNil(am.idpManager) && !IsEmbeddedIdp(am.idpManager) { go func() { - err := am.warmupIDPCache(ctx, cacheStore) + err := am.warmupIDPCache(ctx, sharedCacheStore) if err != nil { log.WithContext(ctx).Warnf("failed warming up cache due to error: %v", err) // todo retry? diff --git a/management/server/account_test.go b/management/server/account_test.go index 2f0533281..4453d064e 100644 --- a/management/server/account_test.go +++ b/management/server/account_test.go @@ -3134,10 +3134,15 @@ func createManager(t testing.TB) (*DefaultAccountManager, *update_channel.PeersU ctx := context.Background() + cacheStore, err := cache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + if err != nil { + return nil, nil, err + } + updateManager := update_channel.NewPeersUpdateManager(metrics) requestBuffer := NewAccountRequestBuffer(ctx, store) networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock(), ephemeral_manager.NewEphemeralManager(store, peers.NewManager(store, permissionsManager)), &config.Config{}) - manager, err := BuildManager(ctx, &config.Config{}, store, networkMapController, job.NewJobManager(nil, store, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) + manager, err := BuildManager(ctx, &config.Config{}, store, networkMapController, job.NewJobManager(nil, store, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false, cacheStore) if err != nil { return nil, nil, err } diff --git a/management/server/cache/store.go b/management/server/cache/store.go index e232a58c3..a4e49ecb2 100644 --- a/management/server/cache/store.go +++ b/management/server/cache/store.go @@ -22,6 +22,15 @@ const RedisStoreEnvVar = "NB_CACHE_REDIS_ADDRESS" // legacyIdPCacheRedisEnvVar is the previous environment variable used for IDP cache. const legacyIdPCacheRedisEnvVar = "NB_IDP_CACHE_REDIS_ADDRESS" +const ( + // DefaultStoreMaxTimeout is the default max timeout for the shared cache store. + DefaultStoreMaxTimeout = 7 * 24 * time.Hour + // DefaultStoreCleanupInterval is the default cleanup interval for the shared cache store. + DefaultStoreCleanupInterval = 30 * time.Minute + // DefaultStoreMaxConn is the default max connections for the shared cache store. + DefaultStoreMaxConn = 1000 +) + // NewStore creates a new cache store with the given max timeout and cleanup interval. It checks for the environment Variable RedisStoreEnvVar // to determine if a redis store should be used. If the environment variable is set, it will attempt to connect to the redis store. func NewStore(ctx context.Context, maxTimeout, cleanupInterval time.Duration, maxConn int) (store.StoreInterface, error) { diff --git a/management/server/dns_test.go b/management/server/dns_test.go index bd0755d0d..0e37a3b22 100644 --- a/management/server/dns_test.go +++ b/management/server/dns_test.go @@ -15,6 +15,7 @@ import ( "github.com/netbirdio/netbird/management/internals/modules/peers" ephemeral_manager "github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral/manager" "github.com/netbirdio/netbird/management/internals/server/config" + "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/management/server/integrations/port_forwarding" "github.com/netbirdio/netbird/management/server/job" "github.com/netbirdio/netbird/management/server/permissions" @@ -225,11 +226,17 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) { peersManager := peers.NewManager(store, permissionsManager) ctx := context.Background() + + cacheStore, err := cache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + if err != nil { + return nil, err + } + updateManager := update_channel.NewPeersUpdateManager(metrics) requestBuffer := NewAccountRequestBuffer(ctx, store) networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.test", port_forwarding.NewControllerMock(), ephemeral_manager.NewEphemeralManager(store, peers.NewManager(store, permissionsManager)), &config.Config{}) - return BuildManager(context.Background(), nil, store, networkMapController, job.NewJobManager(nil, store, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) + return BuildManager(context.Background(), nil, store, networkMapController, job.NewJobManager(nil, store, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false, cacheStore) } func createDNSStore(t *testing.T) (store.Store, error) { diff --git a/management/server/http/handlers/proxy/auth_callback_integration_test.go b/management/server/http/handlers/proxy/auth_callback_integration_test.go index 922bf4352..c99acab63 100644 --- a/management/server/http/handlers/proxy/auth_callback_integration_test.go +++ b/management/server/http/handlers/proxy/auth_callback_integration_test.go @@ -22,6 +22,7 @@ import ( nbproxy "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/proxy" "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/service" nbgrpc "github.com/netbirdio/netbird/management/internals/shared/grpc" + nbcache "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/management/server/store" "github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/management/server/users" @@ -191,11 +192,11 @@ func setupAuthCallbackTest(t *testing.T) *testSetup { oidcServer := newFakeOIDCServer() - tokenStore, err := nbgrpc.NewOneTimeTokenStore(ctx, time.Minute, 10*time.Minute, 100) + cacheStore, err := nbcache.NewStore(ctx, 30*time.Minute, 10*time.Minute, 100) require.NoError(t, err) - pkceStore, err := nbgrpc.NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100) - require.NoError(t, err) + tokenStore := nbgrpc.NewOneTimeTokenStore(ctx, cacheStore) + pkceStore := nbgrpc.NewPKCEVerifierStore(ctx, cacheStore) usersManager := users.NewManager(testStore) diff --git a/management/server/http/testing/testing_tools/channel/channel.go b/management/server/http/testing/testing_tools/channel/channel.go index d9d85a0a2..0203d6177 100644 --- a/management/server/http/testing/testing_tools/channel/channel.go +++ b/management/server/http/testing/testing_tools/channel/channel.go @@ -35,6 +35,7 @@ import ( "github.com/netbirdio/netbird/management/server/account" "github.com/netbirdio/netbird/management/server/activity" serverauth "github.com/netbirdio/netbird/management/server/auth" + nbcache "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/management/server/geolocation" "github.com/netbirdio/netbird/management/server/groups" http2 "github.com/netbirdio/netbird/management/server/http" @@ -87,22 +88,22 @@ func BuildApiBlackBoxWithDBState(t testing_tools.TB, sqlFile string, expectedPee jobManager := job.NewJobManager(nil, store, peersManager) ctx := context.Background() + + cacheStore, err := nbcache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + if err != nil { + t.Fatalf("Failed to create cache store: %v", err) + } + requestBuffer := server.NewAccountRequestBuffer(ctx, store) networkMapController := controller.NewController(ctx, store, metrics, peersUpdateManager, requestBuffer, server.MockIntegratedValidator{}, settingsManager, "", port_forwarding.NewControllerMock(), ephemeral_manager.NewEphemeralManager(store, peersManager), &config.Config{}) - am, err := server.BuildManager(ctx, nil, store, networkMapController, jobManager, nil, "", &activity.InMemoryEventStore{}, geoMock, false, validatorMock, metrics, proxyController, settingsManager, permissionsManager, false) + am, err := server.BuildManager(ctx, nil, store, networkMapController, jobManager, nil, "", &activity.InMemoryEventStore{}, geoMock, false, validatorMock, metrics, proxyController, settingsManager, permissionsManager, false, cacheStore) if err != nil { t.Fatalf("Failed to create manager: %v", err) } accessLogsManager := accesslogsmanager.NewManager(store, permissionsManager, nil) - proxyTokenStore, err := nbgrpc.NewOneTimeTokenStore(ctx, 5*time.Minute, 10*time.Minute, 100) - if err != nil { - t.Fatalf("Failed to create proxy token store: %v", err) - } - pkceverifierStore, err := nbgrpc.NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100) - if err != nil { - t.Fatalf("Failed to create PKCE verifier store: %v", err) - } + proxyTokenStore := nbgrpc.NewOneTimeTokenStore(ctx, cacheStore) + pkceverifierStore := nbgrpc.NewPKCEVerifierStore(ctx, cacheStore) noopMeter := noop.NewMeterProvider().Meter("") proxyMgr, err := proxymanager.NewManager(store, noopMeter) if err != nil { @@ -216,22 +217,22 @@ func BuildApiBlackBoxWithDBStateAndPeerChannel(t testing_tools.TB, sqlFile strin jobManager := job.NewJobManager(nil, store, peersManager) ctx := context.Background() + + cacheStore, err := nbcache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + if err != nil { + t.Fatalf("Failed to create cache store: %v", err) + } + requestBuffer := server.NewAccountRequestBuffer(ctx, store) networkMapController := controller.NewController(ctx, store, metrics, peersUpdateManager, requestBuffer, server.MockIntegratedValidator{}, settingsManager, "", port_forwarding.NewControllerMock(), ephemeral_manager.NewEphemeralManager(store, peersManager), &config.Config{}) - am, err := server.BuildManager(ctx, nil, store, networkMapController, jobManager, nil, "", &activity.InMemoryEventStore{}, geoMock, false, validatorMock, metrics, proxyController, settingsManager, permissionsManager, false) + am, err := server.BuildManager(ctx, nil, store, networkMapController, jobManager, nil, "", &activity.InMemoryEventStore{}, geoMock, false, validatorMock, metrics, proxyController, settingsManager, permissionsManager, false, cacheStore) if err != nil { t.Fatalf("Failed to create manager: %v", err) } accessLogsManager := accesslogsmanager.NewManager(store, permissionsManager, nil) - proxyTokenStore, err := nbgrpc.NewOneTimeTokenStore(ctx, 5*time.Minute, 10*time.Minute, 100) - if err != nil { - t.Fatalf("Failed to create proxy token store: %v", err) - } - pkceverifierStore, err := nbgrpc.NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100) - if err != nil { - t.Fatalf("Failed to create PKCE verifier store: %v", err) - } + proxyTokenStore := nbgrpc.NewOneTimeTokenStore(ctx, cacheStore) + pkceverifierStore := nbgrpc.NewPKCEVerifierStore(ctx, cacheStore) noopMeter := noop.NewMeterProvider().Meter("") proxyMgr, err := proxymanager.NewManager(store, noopMeter) if err != nil { diff --git a/management/server/identity_provider_test.go b/management/server/identity_provider_test.go index 9fce6b9c0..d51254c55 100644 --- a/management/server/identity_provider_test.go +++ b/management/server/identity_provider_test.go @@ -8,6 +8,7 @@ import ( "net/http/httptest" "path/filepath" "testing" + "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -19,6 +20,7 @@ import ( ephemeral_manager "github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral/manager" "github.com/netbirdio/netbird/management/internals/server/config" "github.com/netbirdio/netbird/management/server/activity" + "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/management/server/idp" "github.com/netbirdio/netbird/management/server/integrations/port_forwarding" "github.com/netbirdio/netbird/management/server/job" @@ -83,10 +85,15 @@ func createManagerWithEmbeddedIdP(t testing.TB) (*DefaultAccountManager, *update permissionsManager := permissions.NewManager(testStore) peersManager := peers.NewManager(testStore, permissionsManager) + cacheStore, err := cache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + if err != nil { + return nil, nil, err + } + updateManager := update_channel.NewPeersUpdateManager(metrics) requestBuffer := NewAccountRequestBuffer(ctx, testStore) networkMapController := controller.NewController(ctx, testStore, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock(), ephemeral_manager.NewEphemeralManager(testStore, peersManager), &config.Config{}) - manager, err := BuildManager(ctx, &config.Config{}, testStore, networkMapController, job.NewJobManager(nil, testStore, peersManager), idpManager, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) + manager, err := BuildManager(ctx, &config.Config{}, testStore, networkMapController, job.NewJobManager(nil, testStore, peersManager), idpManager, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false, cacheStore) if err != nil { return nil, nil, err } diff --git a/management/server/management_proto_test.go b/management/server/management_proto_test.go index 090c99877..4e6eb0a33 100644 --- a/management/server/management_proto_test.go +++ b/management/server/management_proto_test.go @@ -29,6 +29,7 @@ import ( "github.com/netbirdio/netbird/management/internals/server/config" nbgrpc "github.com/netbirdio/netbird/management/internals/shared/grpc" "github.com/netbirdio/netbird/management/server/activity" + "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/management/server/groups" "github.com/netbirdio/netbird/management/server/integrations/port_forwarding" "github.com/netbirdio/netbird/management/server/job" @@ -369,9 +370,15 @@ func startManagementForTest(t *testing.T, testFile string, config *config.Config requestBuffer := NewAccountRequestBuffer(ctx, store) ephemeralMgr := manager.NewEphemeralManager(store, peers.NewManager(store, permissionsManager)) + cacheStore, err := cache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + if err != nil { + cleanup() + return nil, nil, "", cleanup, err + } + networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock(), ephemeralMgr, config) accountManager, err := BuildManager(ctx, nil, store, networkMapController, jobManager, nil, "", - eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) + eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false, cacheStore) if err != nil { cleanup() diff --git a/management/server/management_test.go b/management/server/management_test.go index de02855bf..3ac28cd4a 100644 --- a/management/server/management_test.go +++ b/management/server/management_test.go @@ -28,6 +28,7 @@ import ( nbgrpc "github.com/netbirdio/netbird/management/internals/shared/grpc" "github.com/netbirdio/netbird/management/server" "github.com/netbirdio/netbird/management/server/activity" + nbcache "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/management/server/groups" "github.com/netbirdio/netbird/management/server/integrations/port_forwarding" "github.com/netbirdio/netbird/management/server/job" @@ -207,6 +208,12 @@ func startServer( jobManager := job.NewJobManager(nil, str, peersManager) ctx := context.Background() + + cacheStore, err := nbcache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + if err != nil { + t.Fatalf("failed creating cache store: %v", err) + } + updateManager := update_channel.NewPeersUpdateManager(metrics) requestBuffer := server.NewAccountRequestBuffer(ctx, str) networkMapController := controller.NewController(ctx, str, metrics, updateManager, requestBuffer, server.MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock(), ephemeral_manager.NewEphemeralManager(str, peers.NewManager(str, permissionsManager)), config) @@ -227,7 +234,8 @@ func startServer( port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, - false) + false, + cacheStore) if err != nil { t.Fatalf("failed creating an account manager: %v", err) } diff --git a/management/server/nameserver_test.go b/management/server/nameserver_test.go index 90b4b9687..d10d4464f 100644 --- a/management/server/nameserver_test.go +++ b/management/server/nameserver_test.go @@ -17,6 +17,7 @@ import ( ephemeral_manager "github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral/manager" "github.com/netbirdio/netbird/management/internals/server/config" "github.com/netbirdio/netbird/management/server/activity" + "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/management/server/integrations/port_forwarding" "github.com/netbirdio/netbird/management/server/job" nbpeer "github.com/netbirdio/netbird/management/server/peer" @@ -794,11 +795,17 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) { peersManager := peers.NewManager(store, permissionsManager) ctx := context.Background() + + cacheStore, err := cache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + if err != nil { + return nil, err + } + updateManager := update_channel.NewPeersUpdateManager(metrics) requestBuffer := NewAccountRequestBuffer(ctx, store) networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock(), ephemeral_manager.NewEphemeralManager(store, peers.NewManager(store, permissionsManager)), &config.Config{}) - return BuildManager(context.Background(), nil, store, networkMapController, job.NewJobManager(nil, store, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) + return BuildManager(context.Background(), nil, store, networkMapController, job.NewJobManager(nil, store, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false, cacheStore) } func createNSStore(t *testing.T) (store.Store, error) { diff --git a/management/server/peer_test.go b/management/server/peer_test.go index 51c16d730..6f8d924fd 100644 --- a/management/server/peer_test.go +++ b/management/server/peer_test.go @@ -32,6 +32,7 @@ import ( ephemeral_manager "github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral/manager" "github.com/netbirdio/netbird/management/internals/server/config" "github.com/netbirdio/netbird/management/internals/shared/grpc" + nbcache "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/management/server/http/testing/testing_tools" "github.com/netbirdio/netbird/management/server/integrations/port_forwarding" "github.com/netbirdio/netbird/management/server/job" @@ -1294,11 +1295,15 @@ func Test_RegisterPeerByUser(t *testing.T) { peersManager := peers.NewManager(s, permissionsManager) ctx := context.Background() + + cacheStore, err := nbcache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + require.NoError(t, err) + updateManager := update_channel.NewPeersUpdateManager(metrics) requestBuffer := NewAccountRequestBuffer(ctx, s) networkMapController := controller.NewController(ctx, s, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock(), ephemeral_manager.NewEphemeralManager(s, peers.NewManager(s, permissionsManager)), &config.Config{}) - am, err := BuildManager(context.Background(), nil, s, networkMapController, job.NewJobManager(nil, s, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) + am, err := BuildManager(context.Background(), nil, s, networkMapController, job.NewJobManager(nil, s, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false, cacheStore) assert.NoError(t, err) existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" @@ -1380,11 +1385,15 @@ func Test_RegisterPeerBySetupKey(t *testing.T) { peersManager := peers.NewManager(s, permissionsManager) ctx := context.Background() + + cacheStore, err := nbcache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + require.NoError(t, err) + updateManager := update_channel.NewPeersUpdateManager(metrics) requestBuffer := NewAccountRequestBuffer(ctx, s) networkMapController := controller.NewController(ctx, s, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock(), ephemeral_manager.NewEphemeralManager(s, peers.NewManager(s, permissionsManager)), &config.Config{}) - am, err := BuildManager(context.Background(), nil, s, networkMapController, job.NewJobManager(nil, s, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) + am, err := BuildManager(context.Background(), nil, s, networkMapController, job.NewJobManager(nil, s, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false, cacheStore) assert.NoError(t, err) existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" @@ -1534,11 +1543,15 @@ func Test_RegisterPeerRollbackOnFailure(t *testing.T) { peersManager := peers.NewManager(s, permissionsManager) ctx := context.Background() + + cacheStore, err := nbcache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + require.NoError(t, err) + updateManager := update_channel.NewPeersUpdateManager(metrics) requestBuffer := NewAccountRequestBuffer(ctx, s) networkMapController := controller.NewController(ctx, s, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock(), ephemeral_manager.NewEphemeralManager(s, peers.NewManager(s, permissionsManager)), &config.Config{}) - am, err := BuildManager(context.Background(), nil, s, networkMapController, job.NewJobManager(nil, s, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) + am, err := BuildManager(context.Background(), nil, s, networkMapController, job.NewJobManager(nil, s, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false, cacheStore) assert.NoError(t, err) existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" @@ -1615,11 +1628,15 @@ func Test_LoginPeer(t *testing.T) { peersManager := peers.NewManager(s, permissionsManager) ctx := context.Background() + + cacheStore, err := nbcache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + require.NoError(t, err) + updateManager := update_channel.NewPeersUpdateManager(metrics) requestBuffer := NewAccountRequestBuffer(ctx, s) networkMapController := controller.NewController(ctx, s, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock(), ephemeral_manager.NewEphemeralManager(s, peers.NewManager(s, permissionsManager)), &config.Config{}) - am, err := BuildManager(context.Background(), nil, s, networkMapController, job.NewJobManager(nil, s, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) + am, err := BuildManager(context.Background(), nil, s, networkMapController, job.NewJobManager(nil, s, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false, cacheStore) assert.NoError(t, err) existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" diff --git a/management/server/route_test.go b/management/server/route_test.go index d4882eff8..91b2cf982 100644 --- a/management/server/route_test.go +++ b/management/server/route_test.go @@ -20,6 +20,7 @@ import ( ephemeral_manager "github.com/netbirdio/netbird/management/internals/modules/peers/ephemeral/manager" "github.com/netbirdio/netbird/management/internals/server/config" "github.com/netbirdio/netbird/management/server/activity" + "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/management/server/integrations/port_forwarding" "github.com/netbirdio/netbird/management/server/job" resourceTypes "github.com/netbirdio/netbird/management/server/networks/resources/types" @@ -1293,11 +1294,17 @@ func createRouterManager(t *testing.T) (*DefaultAccountManager, *update_channel. peersManager := peers.NewManager(store, permissionsManager) ctx := context.Background() + + cacheStore, err := cache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + if err != nil { + return nil, nil, err + } + updateManager := update_channel.NewPeersUpdateManager(metrics) requestBuffer := NewAccountRequestBuffer(ctx, store) networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock(), ephemeral_manager.NewEphemeralManager(store, peers.NewManager(store, permissionsManager)), &config.Config{}) - am, err := BuildManager(context.Background(), nil, store, networkMapController, job.NewJobManager(nil, store, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) + am, err := BuildManager(context.Background(), nil, store, networkMapController, job.NewJobManager(nil, store, peersManager), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false, cacheStore) if err != nil { return nil, nil, err } diff --git a/proxy/management_integration_test.go b/proxy/management_integration_test.go index 17510f37e..4b1ecf922 100644 --- a/proxy/management_integration_test.go +++ b/proxy/management_integration_test.go @@ -22,6 +22,7 @@ import ( nbproxy "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/proxy" "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/service" nbgrpc "github.com/netbirdio/netbird/management/internals/shared/grpc" + nbcache "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/management/server/store" "github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/management/server/users" @@ -113,11 +114,11 @@ func setupIntegrationTest(t *testing.T) *integrationTestSetup { } // Create real token store - tokenStore, err := nbgrpc.NewOneTimeTokenStore(ctx, 5*time.Minute, 10*time.Minute, 100) + cacheStore, err := nbcache.NewStore(ctx, 30*time.Minute, 10*time.Minute, 100) require.NoError(t, err) - pkceStore, err := nbgrpc.NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100) - require.NoError(t, err) + tokenStore := nbgrpc.NewOneTimeTokenStore(ctx, cacheStore) + pkceStore := nbgrpc.NewPKCEVerifierStore(ctx, cacheStore) // Create real users manager usersManager := users.NewManager(testStore) diff --git a/shared/management/client/client_test.go b/shared/management/client/client_test.go index f5edb6b95..d9a1a7d65 100644 --- a/shared/management/client/client_test.go +++ b/shared/management/client/client_test.go @@ -31,6 +31,7 @@ import ( "github.com/netbirdio/netbird/management/internals/server/config" mgmt "github.com/netbirdio/netbird/management/server" "github.com/netbirdio/netbird/management/server/activity" + nbcache "github.com/netbirdio/netbird/management/server/cache" "github.com/netbirdio/netbird/management/server/groups" "github.com/netbirdio/netbird/management/server/integrations/port_forwarding" "github.com/netbirdio/netbird/management/server/mock_server" @@ -95,9 +96,16 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) { settingsManagerMock := settings.NewMockManager(ctrl) jobManager := job.NewJobManager(nil, store, peersManger) - ia, _ := integrations.NewIntegratedValidator(context.Background(), peersManger, settingsManagerMock, eventStore) + ctx := context.Background() - metrics, err := telemetry.NewDefaultAppMetrics(context.Background()) + cacheStore, err := nbcache.NewStore(ctx, 100*time.Millisecond, 300*time.Millisecond, 100) + if err != nil { + t.Fatal(err) + } + + ia, _ := integrations.NewIntegratedValidator(ctx, peersManger, settingsManagerMock, eventStore, cacheStore) + + metrics, err := telemetry.NewDefaultAppMetrics(ctx) require.NoError(t, err) settingsMockManager := settings.NewMockManager(ctrl) @@ -116,11 +124,10 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) { Return(&types.ExtraSettings{}, nil). AnyTimes() - ctx := context.Background() updateManager := update_channel.NewPeersUpdateManager(metrics) requestBuffer := mgmt.NewAccountRequestBuffer(ctx, store) networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, mgmt.MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock(), ephemeral_manager.NewEphemeralManager(store, peersManger), config) - accountManager, err := mgmt.BuildManager(context.Background(), config, store, networkMapController, jobManager, nil, "", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false) + accountManager, err := mgmt.BuildManager(context.Background(), config, store, networkMapController, jobManager, nil, "", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false, cacheStore) if err != nil { t.Fatal(err) }