diff --git a/client/cmd/testutil_test.go b/client/cmd/testutil_test.go index fd1007bb4..d7c48b1f2 100644 --- a/client/cmd/testutil_test.go +++ b/client/cmd/testutil_test.go @@ -135,7 +135,7 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp if err != nil { t.Fatal(err) } - mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &mgmt.MockIntegratedValidator{}, networkMapController, nil, nil) + mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &mgmt.MockIntegratedValidator{}, networkMapController, nil, nil, nil) if err != nil { t.Fatal(err) } diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index f4c5be70a..936083074 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -1671,7 +1671,7 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri if err != nil { return nil, "", err } - mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil, nil) + mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil, nil, nil) if err != nil { return nil, "", err } diff --git a/client/server/server_test.go b/client/server/server_test.go index 54ad47e55..f0b9abce3 100644 --- a/client/server/server_test.go +++ b/client/server/server_test.go @@ -335,7 +335,7 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve if err != nil { return nil, "", err } - mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil, nil) + mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, &server.MockIntegratedValidator{}, networkMapController, nil, nil, nil) if err != nil { return nil, "", err } diff --git a/management/internals/server/boot.go b/management/internals/server/boot.go index ff2c27ac3..a6d376878 100644 --- a/management/internals/server/boot.go +++ b/management/internals/server/boot.go @@ -164,7 +164,8 @@ func (s *BaseServer) GRPCServer() *grpc.Server { gRPCAPIHandler := grpc.NewServer(gRPCOpts...) peerSerialCache := nbgrpc.NewPeerSerialCache(context.Background(), s.CacheStore(), nbgrpc.DefaultPeerSerialCacheTTL) - srv, err := nbgrpc.NewServer(s.Config, s.AccountManager(), s.SettingsManager(), s.JobManager(), s.SecretsManager(), s.Metrics(), s.AuthManager(), s.IntegratedValidator(), s.NetworkMapController(), s.OAuthConfigProvider(), peerSerialCache) + fastPathFlag := nbgrpc.RunFastPathFlagRoutine(context.Background(), nbgrpc.DefaultFastPathFlagInterval, nbgrpc.DefaultFastPathRedisKey) + srv, err := nbgrpc.NewServer(s.Config, s.AccountManager(), s.SettingsManager(), s.JobManager(), s.SecretsManager(), s.Metrics(), s.AuthManager(), s.IntegratedValidator(), s.NetworkMapController(), s.OAuthConfigProvider(), peerSerialCache, fastPathFlag) if err != nil { log.Fatalf("failed to create management server: %v", err) } diff --git a/management/internals/shared/grpc/fast_path_flag.go b/management/internals/shared/grpc/fast_path_flag.go new file mode 100644 index 000000000..b269ba95f --- /dev/null +++ b/management/internals/shared/grpc/fast_path_flag.go @@ -0,0 +1,155 @@ +package grpc + +import ( + "context" + "errors" + "fmt" + "os" + "strings" + "sync/atomic" + "time" + + "github.com/redis/go-redis/v9" + log "github.com/sirupsen/logrus" +) + +const ( + fastPathRedisURLEnv = "NB_PEER_SYNC_REDIS_ADDRESS" + + // DefaultFastPathFlagInterval is the default poll interval for the Sync + // fast-path feature flag. Kept lower than the log-level overrider because + // operators will want the toggle to propagate quickly during rollout. + DefaultFastPathFlagInterval = 1 * time.Minute + + // DefaultFastPathRedisKey is the Redis key polled by RunFastPathFlagRoutine + // when the caller does not provide an override. + DefaultFastPathRedisKey = "peerSyncFastPath" +) + +// FastPathFlag exposes the current on/off state of the Sync fast path. The +// zero value and a nil receiver both report disabled, so callers can always +// treat the flag as a non-nil gate without an additional nil check. +type FastPathFlag struct { + enabled atomic.Bool +} + +// NewFastPathFlag returns a FastPathFlag whose state is set to the given +// value. Callers that need the runtime Redis-backed toggle should use +// RunFastPathFlagRoutine instead; this constructor is meant for tests and +// for consumers that want to force the flag on or off. +func NewFastPathFlag(enabled bool) *FastPathFlag { + f := &FastPathFlag{} + f.setEnabled(enabled) + return f +} + +// Enabled reports whether the Sync fast path is currently enabled for this +// replica. A nil receiver reports false so a disabled build or test can pass +// a nil flag and skip the fast path entirely. +func (f *FastPathFlag) Enabled() bool { + if f == nil { + return false + } + return f.enabled.Load() +} + +func (f *FastPathFlag) setEnabled(v bool) { + if f == nil { + return + } + f.enabled.Store(v) +} + +// RunFastPathFlagRoutine starts a background goroutine that polls Redis for +// the Sync fast-path feature flag and updates the returned FastPathFlag +// accordingly. When NB_PEER_SYNC_REDIS_ADDRESS is not set the routine logs and +// returns a handle that stays permanently disabled, so every Sync falls back +// to the full network map path. +func RunFastPathFlagRoutine(ctx context.Context, interval time.Duration, redisKey string) *FastPathFlag { + flag := &FastPathFlag{} + + redisEnvAddr := os.Getenv(fastPathRedisURLEnv) + if redisEnvAddr == "" { + log.Infof("Environment variable %s not set. Sync fast path disabled", fastPathRedisURLEnv) + return flag + } + + client, err := getFastPathRedisStore(ctx, redisEnvAddr) + if err != nil { + log.Errorf("Unable to connect to Redis at %v for Sync fast-path flag: %v", redisEnvAddr, err) + return flag + } + + if redisKey == "" { + redisKey = DefaultFastPathRedisKey + } + + go func() { + ticker := time.NewTicker(interval) + defer func() { + ticker.Stop() + if err := client.Close(); err != nil { + log.Debugf("close Sync fast-path redis client: %v", err) + } + }() + + refresh := func() { + getCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + value, err := client.Get(getCtx, redisKey).Result() + if errors.Is(err, redis.Nil) { + flag.setEnabled(false) + return + } + if err != nil { + log.Errorf("Unable to get Sync fast-path flag from redis at %v: %v", redisEnvAddr, err) + return + } + flag.setEnabled(parseFastPathFlag(value)) + } + + refresh() + + for { + select { + case <-ctx.Done(): + log.Infof("Stopping Sync fast-path flag routine") + return + case <-ticker.C: + refresh() + } + } + }() + + return flag +} + +// parseFastPathFlag accepts "1" or "true" (any casing, surrounding whitespace +// tolerated) as enabled and treats every other value as disabled. Missing +// keys surface as redis.Nil in the caller and also resolve to disabled. +func parseFastPathFlag(value string) bool { + v := strings.TrimSpace(value) + if v == "1" { + return true + } + return strings.EqualFold(v, "true") +} + +func getFastPathRedisStore(ctx context.Context, redisEnvAddr string) (*redis.Client, error) { + options, err := redis.ParseURL(redisEnvAddr) + if err != nil { + return nil, fmt.Errorf("parse redis fast-path url: %w", err) + } + + client := redis.NewClient(options) + subCtx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + if _, err := client.Ping(subCtx).Result(); err != nil { + return nil, err + } + + log.WithContext(subCtx).Infof("using redis for Sync fast-path flag at %s", redisEnvAddr) + return client, nil +} diff --git a/management/internals/shared/grpc/fast_path_flag_test.go b/management/internals/shared/grpc/fast_path_flag_test.go new file mode 100644 index 000000000..53cc77d07 --- /dev/null +++ b/management/internals/shared/grpc/fast_path_flag_test.go @@ -0,0 +1,161 @@ +package grpc + +import ( + "context" + "testing" + "time" + + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + testcontainersredis "github.com/testcontainers/testcontainers-go/modules/redis" + "github.com/testcontainers/testcontainers-go/wait" +) + +func TestParseFastPathFlag(t *testing.T) { + tests := []struct { + name string + value string + want bool + }{ + {"one", "1", true}, + {"true lowercase", "true", true}, + {"true uppercase", "TRUE", true}, + {"true mixed case", "True", true}, + {"true with whitespace", " true ", true}, + {"zero", "0", false}, + {"false", "false", false}, + {"empty", "", false}, + {"yes", "yes", false}, + {"garbage", "garbage", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, parseFastPathFlag(tt.value), "parseFastPathFlag(%q)", tt.value) + }) + } +} + +func TestFastPathFlag_EnabledDefaultsFalse(t *testing.T) { + flag := &FastPathFlag{} + assert.False(t, flag.Enabled(), "zero value flag should report disabled") +} + +func TestFastPathFlag_NilSafeEnabled(t *testing.T) { + var flag *FastPathFlag + assert.False(t, flag.Enabled(), "nil flag should report disabled without panicking") +} + +func TestFastPathFlag_SetEnabled(t *testing.T) { + flag := &FastPathFlag{} + flag.setEnabled(true) + assert.True(t, flag.Enabled(), "flag should report enabled after setEnabled(true)") + flag.setEnabled(false) + assert.False(t, flag.Enabled(), "flag should report disabled after setEnabled(false)") +} + +func TestFastPathRedisStore_InvalidURL(t *testing.T) { + _, err := getFastPathRedisStore(context.Background(), "invalid-url") + assert.Error(t, err, "Should fail with invalid URL") +} + +func TestFastPathRedisStore_UnreachableHost(t *testing.T) { + _, err := getFastPathRedisStore(context.Background(), "redis://127.0.0.1:59998") + assert.Error(t, err, "Should fail when Redis is unreachable") +} + +func TestRunFastPathFlagRoutine_DisabledWithoutEnvVar(t *testing.T) { + t.Setenv(fastPathRedisURLEnv, "") + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + flag := RunFastPathFlagRoutine(ctx, 100*time.Millisecond, "any-key") + require.NotNil(t, flag, "RunFastPathFlagRoutine should always return a non-nil flag") + assert.False(t, flag.Enabled(), "flag should stay disabled when env var is unset") + + time.Sleep(250 * time.Millisecond) + assert.False(t, flag.Enabled(), "flag should remain disabled even after wait when env var is unset") +} + +func TestRunFastPathFlagRoutine_ReadsFlagFromRedis(t *testing.T) { + redisURL, client := setupFastPathRedisContainer(t) + + t.Setenv(fastPathRedisURLEnv, redisURL) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + flag := RunFastPathFlagRoutine(ctx, 100*time.Millisecond, "peerSyncFastPath") + require.NotNil(t, flag) + assert.False(t, flag.Enabled(), "flag should start disabled with no key") + + err := client.Set(ctx, "peerSyncFastPath", "1", 0).Err() + require.NoError(t, err, "set redis key must succeed") + + assert.Eventually(t, flag.Enabled, 3*time.Second, 50*time.Millisecond, "flag should flip to enabled after Redis key is set to 1") + + err = client.Set(ctx, "peerSyncFastPath", "0", 0).Err() + require.NoError(t, err, "reset redis key must succeed") + + assert.Eventually(t, func() bool { + return !flag.Enabled() + }, 3*time.Second, 50*time.Millisecond, "flag should flip back to disabled after Redis key is set to 0") + + err = client.Del(ctx, "peerSyncFastPath").Err() + require.NoError(t, err, "delete redis key must succeed") + + err = client.Set(ctx, "peerSyncFastPath", "true", 0).Err() + require.NoError(t, err) + assert.Eventually(t, flag.Enabled, 3*time.Second, 50*time.Millisecond, "flag should accept \"true\" as enabled") +} + +func TestRunFastPathFlagRoutine_MissingKeyKeepsDisabled(t *testing.T) { + redisURL, _ := setupFastPathRedisContainer(t) + t.Setenv(fastPathRedisURLEnv, redisURL) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + flag := RunFastPathFlagRoutine(ctx, 100*time.Millisecond, "peerSyncFastPathAbsent") + require.NotNil(t, flag) + + time.Sleep(400 * time.Millisecond) + assert.False(t, flag.Enabled(), "flag should stay disabled when the key is missing in Redis") +} + +func setupFastPathRedisContainer(t *testing.T) (string, *redis.Client) { + t.Helper() + + ctx := context.Background() + redisContainer, err := testcontainersredis.RunContainer(ctx, + testcontainers.WithImage("redis:7"), + testcontainers.WithWaitStrategy( + wait.ForListeningPort("6379/tcp"), + ), + ) + require.NoError(t, err, "create redis test container") + + t.Cleanup(func() { + if err := redisContainer.Terminate(ctx); err != nil { + t.Logf("failed to terminate redis container: %s", err) + } + }) + + redisURL, err := redisContainer.ConnectionString(ctx) + require.NoError(t, err) + + options, err := redis.ParseURL(redisURL) + require.NoError(t, err) + + client := redis.NewClient(options) + t.Cleanup(func() { + if err := client.Close(); err != nil { + t.Logf("failed to close redis client: %s", err) + } + }) + + return redisURL, client +} diff --git a/management/internals/shared/grpc/server.go b/management/internals/shared/grpc/server.go index 0d3905048..49a893a26 100644 --- a/management/internals/shared/grpc/server.go +++ b/management/internals/shared/grpc/server.go @@ -88,11 +88,17 @@ type Server struct { // peerSerialCache lets Sync skip full network map computation when the peer // already has the latest account serial. A nil cache disables the fast path. peerSerialCache *PeerSerialCache + + // fastPathFlag is the runtime kill switch for the Sync fast path. A nil + // flag or a flag reporting disabled forces every Sync through the full + // network map path. + fastPathFlag *FastPathFlag } -// NewServer creates a new Management server. peerSerialCache is optional; when -// nil the Sync fast path is disabled and every request runs the full map -// computation, matching the pre-cache behaviour. +// NewServer creates a new Management server. peerSerialCache and fastPathFlag +// are both optional; when either is nil or the flag reports disabled, the +// Sync fast path is disabled and every request runs the full map computation, +// matching the pre-cache behaviour. func NewServer( config *nbconfig.Config, accountManager account.Manager, @@ -105,6 +111,7 @@ func NewServer( networkMapController network_map.Controller, oAuthConfigProvider idp.OAuthConfigProvider, peerSerialCache *PeerSerialCache, + fastPathFlag *FastPathFlag, ) (*Server, error) { if appMetrics != nil { // update gauge based on number of connected peers which is equal to open gRPC streams @@ -154,6 +161,7 @@ func NewServer( syncLimEnabled: syncLimEnabled, peerSerialCache: peerSerialCache, + fastPathFlag: fastPathFlag, }, nil } diff --git a/management/internals/shared/grpc/sync_fast_path.go b/management/internals/shared/grpc/sync_fast_path.go index 5819f062d..aaf0b4ae1 100644 --- a/management/internals/shared/grpc/sync_fast_path.go +++ b/management/internals/shared/grpc/sync_fast_path.go @@ -140,6 +140,9 @@ func (s *Server) tryFastPathSync( if s.peerSerialCache == nil { return false, nil } + if !s.fastPathFlag.Enabled() { + return false, nil + } if strings.EqualFold(peerMeta.GoOS, "android") { return false, nil } @@ -251,6 +254,9 @@ func (s *Server) recordPeerSyncEntry(peerKey string, netMap *nbtypes.NetworkMap, if s.peerSerialCache == nil { return } + if !s.fastPathFlag.Enabled() { + return + } if netMap == nil || netMap.Network == nil { return } @@ -268,6 +274,9 @@ func (s *Server) recordPeerSyncEntryFromUpdate(peerKey string, update *network_m if s.peerSerialCache == nil || update == nil || update.Update == nil || update.Update.NetworkMap == nil { return } + if !s.fastPathFlag.Enabled() { + return + } serial := update.Update.NetworkMap.GetSerial() if serial == 0 { return diff --git a/management/server/management_proto_test.go b/management/server/management_proto_test.go index fae609934..48147fc06 100644 --- a/management/server/management_proto_test.go +++ b/management/server/management_proto_test.go @@ -392,7 +392,8 @@ func startManagementForTest(t *testing.T, testFile string, config *config.Config } peerSerialCache := nbgrpc.NewPeerSerialCache(ctx, cacheStore, time.Minute) - mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, MockIntegratedValidator{}, networkMapController, nil, peerSerialCache) + fastPathFlag := nbgrpc.NewFastPathFlag(true) + mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, MockIntegratedValidator{}, networkMapController, nil, peerSerialCache, fastPathFlag) if err != nil { return nil, nil, "", cleanup, err } diff --git a/management/server/management_test.go b/management/server/management_test.go index f1d49193c..6396de6b6 100644 --- a/management/server/management_test.go +++ b/management/server/management_test.go @@ -257,6 +257,7 @@ func startServer( networkMapController, nil, nil, + nil, ) if err != nil { t.Fatalf("failed creating management server: %v", err) diff --git a/shared/management/client/client_test.go b/shared/management/client/client_test.go index a8e8172dc..bc566a058 100644 --- a/shared/management/client/client_test.go +++ b/shared/management/client/client_test.go @@ -138,7 +138,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) { if err != nil { t.Fatal(err) } - mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, mgmt.MockIntegratedValidator{}, networkMapController, nil, nil) + mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, jobManager, secretsManager, nil, nil, mgmt.MockIntegratedValidator{}, networkMapController, nil, nil, nil) if err != nil { t.Fatal(err) }