mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-21 17:56:39 +00:00
Centralize cache store creation to reuse a single Redis connection pool
Each cache consumer (IDP cache, token store, PKCE store, secrets manager, EDR validator) was independently calling NewStore, creating separate Redis clients with their own connection pools — up to 1400 potential connections from a single management server process. Introduce a shared CacheStore() singleton on BaseServer that creates one store at boot and injects it into all consumers. Consumer constructors now receive a store.StoreInterface instead of creating their own. For Redis mode, all consumers share one connection pool (1000 max conns). For in-memory mode, all consumers share one GoCache instance.
This commit is contained in:
@@ -9,13 +9,22 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cachestore "github.com/eko/gocache/lib/v4/store"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy/proxy"
|
||||
nbcache "github.com/netbirdio/netbird/management/server/cache"
|
||||
"github.com/netbirdio/netbird/shared/management/proto"
|
||||
)
|
||||
|
||||
func testCacheStore(t *testing.T) cachestore.StoreInterface {
|
||||
t.Helper()
|
||||
s, err := nbcache.NewStore(context.Background(), 30*time.Minute, 10*time.Minute, 100)
|
||||
require.NoError(t, err)
|
||||
return s
|
||||
}
|
||||
|
||||
type testProxyController struct {
|
||||
mu sync.Mutex
|
||||
clusterProxies map[string]map[string]struct{}
|
||||
@@ -114,11 +123,8 @@ func drainEmpty(ch chan *proto.GetMappingUpdateResponse) bool {
|
||||
|
||||
func TestSendServiceUpdateToCluster_UniqueTokensPerProxy(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
tokenStore, err := NewOneTimeTokenStore(ctx, time.Hour, 10*time.Minute, 100)
|
||||
require.NoError(t, err)
|
||||
|
||||
pkceStore, err := NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100)
|
||||
require.NoError(t, err)
|
||||
tokenStore := NewOneTimeTokenStore(ctx, testCacheStore(t))
|
||||
pkceStore := NewPKCEVerifierStore(ctx, testCacheStore(t))
|
||||
|
||||
s := &ProxyServiceServer{
|
||||
tokenStore: tokenStore,
|
||||
@@ -174,11 +180,8 @@ func TestSendServiceUpdateToCluster_UniqueTokensPerProxy(t *testing.T) {
|
||||
|
||||
func TestSendServiceUpdateToCluster_DeleteNoToken(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
tokenStore, err := NewOneTimeTokenStore(ctx, time.Hour, 10*time.Minute, 100)
|
||||
require.NoError(t, err)
|
||||
|
||||
pkceStore, err := NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100)
|
||||
require.NoError(t, err)
|
||||
tokenStore := NewOneTimeTokenStore(ctx, testCacheStore(t))
|
||||
pkceStore := NewPKCEVerifierStore(ctx, testCacheStore(t))
|
||||
|
||||
s := &ProxyServiceServer{
|
||||
tokenStore: tokenStore,
|
||||
@@ -211,11 +214,8 @@ func TestSendServiceUpdateToCluster_DeleteNoToken(t *testing.T) {
|
||||
|
||||
func TestSendServiceUpdate_UniqueTokensPerProxy(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
tokenStore, err := NewOneTimeTokenStore(ctx, time.Hour, 10*time.Minute, 100)
|
||||
require.NoError(t, err)
|
||||
|
||||
pkceStore, err := NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100)
|
||||
require.NoError(t, err)
|
||||
tokenStore := NewOneTimeTokenStore(ctx, testCacheStore(t))
|
||||
pkceStore := NewPKCEVerifierStore(ctx, testCacheStore(t))
|
||||
|
||||
s := &ProxyServiceServer{
|
||||
tokenStore: tokenStore,
|
||||
@@ -267,8 +267,7 @@ func generateState(s *ProxyServiceServer, redirectURL string) string {
|
||||
|
||||
func TestOAuthState_NeverTheSame(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pkceStore, err := NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100)
|
||||
require.NoError(t, err)
|
||||
pkceStore := NewPKCEVerifierStore(ctx, testCacheStore(t))
|
||||
|
||||
s := &ProxyServiceServer{
|
||||
oidcConfig: ProxyOIDCConfig{
|
||||
@@ -296,8 +295,7 @@ func TestOAuthState_NeverTheSame(t *testing.T) {
|
||||
|
||||
func TestValidateState_RejectsOldTwoPartFormat(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pkceStore, err := NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100)
|
||||
require.NoError(t, err)
|
||||
pkceStore := NewPKCEVerifierStore(ctx, testCacheStore(t))
|
||||
|
||||
s := &ProxyServiceServer{
|
||||
oidcConfig: ProxyOIDCConfig{
|
||||
@@ -307,7 +305,7 @@ func TestValidateState_RejectsOldTwoPartFormat(t *testing.T) {
|
||||
}
|
||||
|
||||
// Old format had only 2 parts: base64(url)|hmac
|
||||
err = s.pkceVerifierStore.Store("base64url|hmac", "test", 10*time.Minute)
|
||||
err := s.pkceVerifierStore.Store("base64url|hmac", "test", 10*time.Minute)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, err = s.ValidateState("base64url|hmac")
|
||||
@@ -317,8 +315,7 @@ func TestValidateState_RejectsOldTwoPartFormat(t *testing.T) {
|
||||
|
||||
func TestValidateState_RejectsInvalidHMAC(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
pkceStore, err := NewPKCEVerifierStore(ctx, 10*time.Minute, 10*time.Minute, 100)
|
||||
require.NoError(t, err)
|
||||
pkceStore := NewPKCEVerifierStore(ctx, testCacheStore(t))
|
||||
|
||||
s := &ProxyServiceServer{
|
||||
oidcConfig: ProxyOIDCConfig{
|
||||
@@ -328,7 +325,7 @@ func TestValidateState_RejectsInvalidHMAC(t *testing.T) {
|
||||
}
|
||||
|
||||
// Store with tampered HMAC
|
||||
err = s.pkceVerifierStore.Store("dGVzdA==|nonce|wrong-hmac", "test", 10*time.Minute)
|
||||
err := s.pkceVerifierStore.Store("dGVzdA==|nonce|wrong-hmac", "test", 10*time.Minute)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, err = s.ValidateState("dGVzdA==|nonce|wrong-hmac")
|
||||
@@ -337,8 +334,7 @@ func TestValidateState_RejectsInvalidHMAC(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSendServiceUpdateToCluster_FiltersOnCapability(t *testing.T) {
|
||||
tokenStore, err := NewOneTimeTokenStore(context.Background(), time.Hour, 10*time.Minute, 100)
|
||||
require.NoError(t, err)
|
||||
tokenStore := NewOneTimeTokenStore(context.Background(), testCacheStore(t))
|
||||
|
||||
s := &ProxyServiceServer{
|
||||
tokenStore: tokenStore,
|
||||
@@ -410,8 +406,7 @@ func TestSendServiceUpdateToCluster_FiltersOnCapability(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSendServiceUpdateToCluster_TLSNotFiltered(t *testing.T) {
|
||||
tokenStore, err := NewOneTimeTokenStore(context.Background(), time.Hour, 10*time.Minute, 100)
|
||||
require.NoError(t, err)
|
||||
tokenStore := NewOneTimeTokenStore(context.Background(), testCacheStore(t))
|
||||
|
||||
s := &ProxyServiceServer{
|
||||
tokenStore: tokenStore,
|
||||
@@ -442,8 +437,7 @@ func TestSendServiceUpdateToCluster_TLSNotFiltered(t *testing.T) {
|
||||
// scenario for an existing service, verifying the correct update types
|
||||
// reach the correct clusters.
|
||||
func TestServiceModifyNotifications(t *testing.T) {
|
||||
tokenStore, err := NewOneTimeTokenStore(context.Background(), time.Hour, 10*time.Minute, 100)
|
||||
require.NoError(t, err)
|
||||
tokenStore := NewOneTimeTokenStore(context.Background(), testCacheStore(t))
|
||||
|
||||
newServer := func() (*ProxyServiceServer, map[string]chan *proto.GetMappingUpdateResponse) {
|
||||
s := &ProxyServiceServer{
|
||||
|
||||
Reference in New Issue
Block a user