diff --git a/management/internals/server/boot.go b/management/internals/server/boot.go index a6d376878..4c47734fa 100644 --- a/management/internals/server/boot.go +++ b/management/internals/server/boot.go @@ -164,7 +164,7 @@ func (s *BaseServer) GRPCServer() *grpc.Server { gRPCAPIHandler := grpc.NewServer(gRPCOpts...) peerSerialCache := nbgrpc.NewPeerSerialCache(context.Background(), s.CacheStore(), nbgrpc.DefaultPeerSerialCacheTTL) - fastPathFlag := nbgrpc.RunFastPathFlagRoutine(context.Background(), nbgrpc.DefaultFastPathFlagInterval, nbgrpc.DefaultFastPathRedisKey) + fastPathFlag := nbgrpc.RunFastPathFlagRoutine(context.Background(), s.CacheStore(), nbgrpc.DefaultFastPathFlagInterval, nbgrpc.DefaultFastPathFlagKey) srv, err := nbgrpc.NewServer(s.Config, s.AccountManager(), s.SettingsManager(), s.JobManager(), s.SecretsManager(), s.Metrics(), s.AuthManager(), s.IntegratedValidator(), s.NetworkMapController(), s.OAuthConfigProvider(), peerSerialCache, fastPathFlag) if err != nil { log.Fatalf("failed to create management server: %v", err) diff --git a/management/internals/shared/grpc/fast_path_flag.go b/management/internals/shared/grpc/fast_path_flag.go index b269ba95f..00b1dc26a 100644 --- a/management/internals/shared/grpc/fast_path_flag.go +++ b/management/internals/shared/grpc/fast_path_flag.go @@ -3,27 +3,24 @@ package grpc import ( "context" "errors" - "fmt" - "os" "strings" "sync/atomic" "time" - "github.com/redis/go-redis/v9" + "github.com/eko/gocache/lib/v4/cache" + "github.com/eko/gocache/lib/v4/store" log "github.com/sirupsen/logrus" ) const ( - fastPathRedisURLEnv = "NB_PEER_SYNC_REDIS_ADDRESS" - // DefaultFastPathFlagInterval is the default poll interval for the Sync // fast-path feature flag. Kept lower than the log-level overrider because // operators will want the toggle to propagate quickly during rollout. DefaultFastPathFlagInterval = 1 * time.Minute - // DefaultFastPathRedisKey is the Redis key polled by RunFastPathFlagRoutine + // DefaultFastPathFlagKey is the cache key polled by RunFastPathFlagRoutine // when the caller does not provide an override. - DefaultFastPathRedisKey = "peerSyncFastPath" + DefaultFastPathFlagKey = "peerSyncFastPath" ) // FastPathFlag exposes the current on/off state of the Sync fast path. The @@ -34,7 +31,7 @@ type FastPathFlag struct { } // NewFastPathFlag returns a FastPathFlag whose state is set to the given -// value. Callers that need the runtime Redis-backed toggle should use +// value. Callers that need the runtime toggle should use // RunFastPathFlagRoutine instead; this constructor is meant for tests and // for consumers that want to force the flag on or off. func NewFastPathFlag(enabled bool) *FastPathFlag { @@ -60,50 +57,46 @@ func (f *FastPathFlag) setEnabled(v bool) { f.enabled.Store(v) } -// RunFastPathFlagRoutine starts a background goroutine that polls Redis for -// the Sync fast-path feature flag and updates the returned FastPathFlag -// accordingly. When NB_PEER_SYNC_REDIS_ADDRESS is not set the routine logs and -// returns a handle that stays permanently disabled, so every Sync falls back -// to the full network map path. -func RunFastPathFlagRoutine(ctx context.Context, interval time.Duration, redisKey string) *FastPathFlag { +// RunFastPathFlagRoutine starts a background goroutine that polls the shared +// cache store for the Sync fast-path feature flag and updates the returned +// FastPathFlag accordingly. When cacheStore is nil the routine returns a +// handle that stays permanently disabled, so every Sync falls back to the +// full network map path. +// +// The shared store is Redis-backed when NB_CACHE_REDIS_ADDRESS is set (so the +// flag is toggled cluster-wide by writing the key in Redis) and falls back to +// an in-process gocache otherwise, which is enough for single-replica dev and +// test setups. +func RunFastPathFlagRoutine(ctx context.Context, cacheStore store.StoreInterface, interval time.Duration, flagKey string) *FastPathFlag { flag := &FastPathFlag{} - redisEnvAddr := os.Getenv(fastPathRedisURLEnv) - if redisEnvAddr == "" { - log.Infof("Environment variable %s not set. Sync fast path disabled", fastPathRedisURLEnv) + if cacheStore == nil { + log.Infof("Shared cache store not provided. Sync fast path disabled") return flag } - client, err := getFastPathRedisStore(ctx, redisEnvAddr) - if err != nil { - log.Errorf("Unable to connect to Redis at %v for Sync fast-path flag: %v", redisEnvAddr, err) - return flag + if flagKey == "" { + flagKey = DefaultFastPathFlagKey } - if redisKey == "" { - redisKey = DefaultFastPathRedisKey - } + flagCache := cache.New[string](cacheStore) go func() { ticker := time.NewTicker(interval) - defer func() { - ticker.Stop() - if err := client.Close(); err != nil { - log.Debugf("close Sync fast-path redis client: %v", err) - } - }() + defer ticker.Stop() refresh := func() { getCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - value, err := client.Get(getCtx, redisKey).Result() - if errors.Is(err, redis.Nil) { - flag.setEnabled(false) - return - } + value, err := flagCache.Get(getCtx, flagKey) if err != nil { - log.Errorf("Unable to get Sync fast-path flag from redis at %v: %v", redisEnvAddr, err) + var notFound *store.NotFound + if errors.As(err, ¬Found) { + flag.setEnabled(false) + return + } + log.Debugf("Sync fast-path flag refresh: %v", err) return } flag.setEnabled(parseFastPathFlag(value)) @@ -126,8 +119,7 @@ func RunFastPathFlagRoutine(ctx context.Context, interval time.Duration, redisKe } // parseFastPathFlag accepts "1" or "true" (any casing, surrounding whitespace -// tolerated) as enabled and treats every other value as disabled. Missing -// keys surface as redis.Nil in the caller and also resolve to disabled. +// tolerated) as enabled and treats every other value as disabled. func parseFastPathFlag(value string) bool { v := strings.TrimSpace(value) if v == "1" { @@ -135,21 +127,3 @@ func parseFastPathFlag(value string) bool { } return strings.EqualFold(v, "true") } - -func getFastPathRedisStore(ctx context.Context, redisEnvAddr string) (*redis.Client, error) { - options, err := redis.ParseURL(redisEnvAddr) - if err != nil { - return nil, fmt.Errorf("parse redis fast-path url: %w", err) - } - - client := redis.NewClient(options) - subCtx, cancel := context.WithTimeout(ctx, 2*time.Second) - defer cancel() - - if _, err := client.Ping(subCtx).Result(); err != nil { - return nil, err - } - - log.WithContext(subCtx).Infof("using redis for Sync fast-path flag at %s", redisEnvAddr) - return client, nil -} diff --git a/management/internals/shared/grpc/fast_path_flag_test.go b/management/internals/shared/grpc/fast_path_flag_test.go index cd4ea89a4..0846a3f78 100644 --- a/management/internals/shared/grpc/fast_path_flag_test.go +++ b/management/internals/shared/grpc/fast_path_flag_test.go @@ -5,10 +5,11 @@ import ( "testing" "time" - "github.com/redis/go-redis/v9" + "github.com/eko/gocache/lib/v4/store" + gocache_store "github.com/eko/gocache/store/go_cache/v4" + gocache "github.com/patrickmn/go-cache" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - testcontainersredis "github.com/testcontainers/testcontainers-go/modules/redis" ) func TestParseFastPathFlag(t *testing.T) { @@ -54,101 +55,75 @@ func TestFastPathFlag_SetEnabled(t *testing.T) { assert.False(t, flag.Enabled(), "flag should report disabled after setEnabled(false)") } -func TestFastPathRedisStore_InvalidURL(t *testing.T) { - _, err := getFastPathRedisStore(context.Background(), "invalid-url") - assert.Error(t, err, "Should fail with invalid URL") +func TestNewFastPathFlag(t *testing.T) { + assert.True(t, NewFastPathFlag(true).Enabled(), "NewFastPathFlag(true) should report enabled") + assert.False(t, NewFastPathFlag(false).Enabled(), "NewFastPathFlag(false) should report disabled") } -func TestFastPathRedisStore_UnreachableHost(t *testing.T) { - _, err := getFastPathRedisStore(context.Background(), "redis://127.0.0.1:59998") - assert.Error(t, err, "Should fail when Redis is unreachable") -} - -func TestRunFastPathFlagRoutine_DisabledWithoutEnvVar(t *testing.T) { - t.Setenv(fastPathRedisURLEnv, "") - +func TestRunFastPathFlagRoutine_NilStoreStaysDisabled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - flag := RunFastPathFlagRoutine(ctx, 100*time.Millisecond, "any-key") + flag := RunFastPathFlagRoutine(ctx, nil, 50*time.Millisecond, "peerSyncFastPath") require.NotNil(t, flag, "RunFastPathFlagRoutine should always return a non-nil flag") - assert.False(t, flag.Enabled(), "flag should stay disabled when env var is unset") + assert.False(t, flag.Enabled(), "flag should stay disabled when no cache store is provided") - time.Sleep(250 * time.Millisecond) - assert.False(t, flag.Enabled(), "flag should remain disabled even after wait when env var is unset") + time.Sleep(150 * time.Millisecond) + assert.False(t, flag.Enabled(), "flag should remain disabled after wait when no cache store is provided") } -func TestRunFastPathFlagRoutine_ReadsFlagFromRedis(t *testing.T) { - redisURL, client := setupFastPathRedisContainer(t) - - t.Setenv(fastPathRedisURLEnv, redisURL) - +func TestRunFastPathFlagRoutine_ReadsFlagFromStore(t *testing.T) { + cacheStore := newFastPathTestStore(t) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - flag := RunFastPathFlagRoutine(ctx, 100*time.Millisecond, "peerSyncFastPath") + flag := RunFastPathFlagRoutine(ctx, cacheStore, 50*time.Millisecond, "peerSyncFastPath") require.NotNil(t, flag) - assert.False(t, flag.Enabled(), "flag should start disabled with no key") + assert.False(t, flag.Enabled(), "flag should start disabled when the key is missing") - err := client.Set(ctx, "peerSyncFastPath", "1", 0).Err() - require.NoError(t, err, "set redis key must succeed") - - assert.Eventually(t, flag.Enabled, 3*time.Second, 50*time.Millisecond, "flag should flip to enabled after Redis key is set to 1") - - err = client.Set(ctx, "peerSyncFastPath", "0", 0).Err() - require.NoError(t, err, "reset redis key must succeed") + require.NoError(t, cacheStore.Set(ctx, "peerSyncFastPath", "1"), "seed flag=1 into shared store") + assert.Eventually(t, flag.Enabled, 2*time.Second, 25*time.Millisecond, "flag should flip enabled after the key is set to 1") + require.NoError(t, cacheStore.Set(ctx, "peerSyncFastPath", "0"), "override flag=0 into shared store") assert.Eventually(t, func() bool { return !flag.Enabled() - }, 3*time.Second, 50*time.Millisecond, "flag should flip back to disabled after Redis key is set to 0") + }, 2*time.Second, 25*time.Millisecond, "flag should flip disabled after the key is set to 0") - err = client.Del(ctx, "peerSyncFastPath").Err() - require.NoError(t, err, "delete redis key must succeed") + require.NoError(t, cacheStore.Delete(ctx, "peerSyncFastPath"), "remove flag key") + assert.Eventually(t, func() bool { + return !flag.Enabled() + }, 2*time.Second, 25*time.Millisecond, "flag should stay disabled after the key is deleted") - err = client.Set(ctx, "peerSyncFastPath", "true", 0).Err() - require.NoError(t, err) - assert.Eventually(t, flag.Enabled, 3*time.Second, 50*time.Millisecond, "flag should accept \"true\" as enabled") + require.NoError(t, cacheStore.Set(ctx, "peerSyncFastPath", "true"), "enable via string true") + assert.Eventually(t, flag.Enabled, 2*time.Second, 25*time.Millisecond, "flag should accept \"true\" as enabled") } func TestRunFastPathFlagRoutine_MissingKeyKeepsDisabled(t *testing.T) { - redisURL, _ := setupFastPathRedisContainer(t) - t.Setenv(fastPathRedisURLEnv, redisURL) - + cacheStore := newFastPathTestStore(t) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - flag := RunFastPathFlagRoutine(ctx, 100*time.Millisecond, "peerSyncFastPathAbsent") + flag := RunFastPathFlagRoutine(ctx, cacheStore, 50*time.Millisecond, "peerSyncFastPathAbsent") require.NotNil(t, flag) - time.Sleep(400 * time.Millisecond) - assert.False(t, flag.Enabled(), "flag should stay disabled when the key is missing in Redis") + time.Sleep(200 * time.Millisecond) + assert.False(t, flag.Enabled(), "flag should stay disabled when the key is missing from the store") } -func setupFastPathRedisContainer(t *testing.T) (string, *redis.Client) { +func TestRunFastPathFlagRoutine_DefaultKeyUsedWhenEmpty(t *testing.T) { + cacheStore := newFastPathTestStore(t) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + require.NoError(t, cacheStore.Set(ctx, DefaultFastPathFlagKey, "1"), "seed default key") + + flag := RunFastPathFlagRoutine(ctx, cacheStore, 50*time.Millisecond, "") + require.NotNil(t, flag) + + assert.Eventually(t, flag.Enabled, 2*time.Second, 25*time.Millisecond, "empty flagKey should fall back to DefaultFastPathFlagKey") +} + +func newFastPathTestStore(t *testing.T) store.StoreInterface { t.Helper() - - ctx := context.Background() - redisContainer, err := testcontainersredis.Run(ctx, "redis:7") - require.NoError(t, err, "create redis test container") - - t.Cleanup(func() { - if err := redisContainer.Terminate(ctx); err != nil { - t.Logf("failed to terminate redis container: %s", err) - } - }) - - redisURL, err := redisContainer.ConnectionString(ctx) - require.NoError(t, err) - - options, err := redis.ParseURL(redisURL) - require.NoError(t, err) - - client := redis.NewClient(options) - t.Cleanup(func() { - if err := client.Close(); err != nil { - t.Logf("failed to close redis client: %s", err) - } - }) - - return redisURL, client + return gocache_store.NewGoCache(gocache.New(5*time.Minute, 10*time.Minute)) }