Replace Redis dependency with a generic cache store for fast path flag handling

This commit is contained in:
mlsmaycon
2026-04-21 16:28:24 +02:00
parent 3716838c25
commit 48c080b861
3 changed files with 75 additions and 126 deletions

View File

@@ -3,27 +3,24 @@ package grpc
import (
"context"
"errors"
"fmt"
"os"
"strings"
"sync/atomic"
"time"
"github.com/redis/go-redis/v9"
"github.com/eko/gocache/lib/v4/cache"
"github.com/eko/gocache/lib/v4/store"
log "github.com/sirupsen/logrus"
)
const (
fastPathRedisURLEnv = "NB_PEER_SYNC_REDIS_ADDRESS"
// DefaultFastPathFlagInterval is the default poll interval for the Sync
// fast-path feature flag. Kept lower than the log-level overrider because
// operators will want the toggle to propagate quickly during rollout.
DefaultFastPathFlagInterval = 1 * time.Minute
// DefaultFastPathRedisKey is the Redis key polled by RunFastPathFlagRoutine
// DefaultFastPathFlagKey is the cache key polled by RunFastPathFlagRoutine
// when the caller does not provide an override.
DefaultFastPathRedisKey = "peerSyncFastPath"
DefaultFastPathFlagKey = "peerSyncFastPath"
)
// FastPathFlag exposes the current on/off state of the Sync fast path. The
@@ -34,7 +31,7 @@ type FastPathFlag struct {
}
// NewFastPathFlag returns a FastPathFlag whose state is set to the given
// value. Callers that need the runtime Redis-backed toggle should use
// value. Callers that need the runtime toggle should use
// RunFastPathFlagRoutine instead; this constructor is meant for tests and
// for consumers that want to force the flag on or off.
func NewFastPathFlag(enabled bool) *FastPathFlag {
@@ -60,50 +57,46 @@ func (f *FastPathFlag) setEnabled(v bool) {
f.enabled.Store(v)
}
// RunFastPathFlagRoutine starts a background goroutine that polls Redis for
// the Sync fast-path feature flag and updates the returned FastPathFlag
// accordingly. When NB_PEER_SYNC_REDIS_ADDRESS is not set the routine logs and
// returns a handle that stays permanently disabled, so every Sync falls back
// to the full network map path.
func RunFastPathFlagRoutine(ctx context.Context, interval time.Duration, redisKey string) *FastPathFlag {
// RunFastPathFlagRoutine starts a background goroutine that polls the shared
// cache store for the Sync fast-path feature flag and updates the returned
// FastPathFlag accordingly. When cacheStore is nil the routine returns a
// handle that stays permanently disabled, so every Sync falls back to the
// full network map path.
//
// The shared store is Redis-backed when NB_CACHE_REDIS_ADDRESS is set (so the
// flag is toggled cluster-wide by writing the key in Redis) and falls back to
// an in-process gocache otherwise, which is enough for single-replica dev and
// test setups.
func RunFastPathFlagRoutine(ctx context.Context, cacheStore store.StoreInterface, interval time.Duration, flagKey string) *FastPathFlag {
flag := &FastPathFlag{}
redisEnvAddr := os.Getenv(fastPathRedisURLEnv)
if redisEnvAddr == "" {
log.Infof("Environment variable %s not set. Sync fast path disabled", fastPathRedisURLEnv)
if cacheStore == nil {
log.Infof("Shared cache store not provided. Sync fast path disabled")
return flag
}
client, err := getFastPathRedisStore(ctx, redisEnvAddr)
if err != nil {
log.Errorf("Unable to connect to Redis at %v for Sync fast-path flag: %v", redisEnvAddr, err)
return flag
if flagKey == "" {
flagKey = DefaultFastPathFlagKey
}
if redisKey == "" {
redisKey = DefaultFastPathRedisKey
}
flagCache := cache.New[string](cacheStore)
go func() {
ticker := time.NewTicker(interval)
defer func() {
ticker.Stop()
if err := client.Close(); err != nil {
log.Debugf("close Sync fast-path redis client: %v", err)
}
}()
defer ticker.Stop()
refresh := func() {
getCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
value, err := client.Get(getCtx, redisKey).Result()
if errors.Is(err, redis.Nil) {
flag.setEnabled(false)
return
}
value, err := flagCache.Get(getCtx, flagKey)
if err != nil {
log.Errorf("Unable to get Sync fast-path flag from redis at %v: %v", redisEnvAddr, err)
var notFound *store.NotFound
if errors.As(err, &notFound) {
flag.setEnabled(false)
return
}
log.Debugf("Sync fast-path flag refresh: %v", err)
return
}
flag.setEnabled(parseFastPathFlag(value))
@@ -126,8 +119,7 @@ func RunFastPathFlagRoutine(ctx context.Context, interval time.Duration, redisKe
}
// parseFastPathFlag accepts "1" or "true" (any casing, surrounding whitespace
// tolerated) as enabled and treats every other value as disabled. Missing
// keys surface as redis.Nil in the caller and also resolve to disabled.
// tolerated) as enabled and treats every other value as disabled.
func parseFastPathFlag(value string) bool {
v := strings.TrimSpace(value)
if v == "1" {
@@ -135,21 +127,3 @@ func parseFastPathFlag(value string) bool {
}
return strings.EqualFold(v, "true")
}
func getFastPathRedisStore(ctx context.Context, redisEnvAddr string) (*redis.Client, error) {
options, err := redis.ParseURL(redisEnvAddr)
if err != nil {
return nil, fmt.Errorf("parse redis fast-path url: %w", err)
}
client := redis.NewClient(options)
subCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
if _, err := client.Ping(subCtx).Result(); err != nil {
return nil, err
}
log.WithContext(subCtx).Infof("using redis for Sync fast-path flag at %s", redisEnvAddr)
return client, nil
}