Refactor sync fast path tests and fix CI flakiness

- Introduce `skipOnWindows` helper to properly skip tests relying on Unix specific paths.
- Replace fixed sleep with `require.Eventually` in `waitForPeerDisconnect` to address flakiness in CI.
- Split `commitFastPath` logic out of `runFastPathSync` to close race conditions and improve clarity.
- Update tests to leverage new helpers and more precise assertions (e.g., `waitForPeerDisconnect`).
- Add `flakyStore` test helper to exercise fail-closed behavior in flag handling.
- Enhance `RunFastPathFlagRoutine` to disable the flag on store read errors.
This commit is contained in:
mlsmaycon
2026-04-21 17:07:31 +02:00
parent 93391fc68f
commit 3eb1298cb4
5 changed files with 169 additions and 57 deletions

View File

@@ -67,6 +67,9 @@ func (f *FastPathFlag) setEnabled(v bool) {
// flag is toggled cluster-wide by writing the key in Redis) and falls back to
// an in-process gocache otherwise, which is enough for single-replica dev and
// test setups.
//
// The routine fails closed: any store read error (other than a plain "key not
// found" miss) disables the flag until Redis confirms it is enabled again.
func RunFastPathFlagRoutine(ctx context.Context, cacheStore store.StoreInterface, interval time.Duration, flagKey string) *FastPathFlag {
flag := &FastPathFlag{}
@@ -92,11 +95,10 @@ func RunFastPathFlagRoutine(ctx context.Context, cacheStore store.StoreInterface
value, err := flagCache.Get(getCtx, flagKey)
if err != nil {
var notFound *store.NotFound
if errors.As(err, &notFound) {
flag.setEnabled(false)
return
if !errors.As(err, &notFound) {
log.Errorf("Sync fast-path flag refresh: %v; disabling fast path", err)
}
log.Debugf("Sync fast-path flag refresh: %v", err)
flag.setEnabled(false)
return
}
flag.setEnabled(parseFastPathFlag(value))

View File

@@ -2,6 +2,8 @@ package grpc
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
@@ -127,3 +129,48 @@ func newFastPathTestStore(t *testing.T) store.StoreInterface {
t.Helper()
return gocache_store.NewGoCache(gocache.New(5*time.Minute, 10*time.Minute))
}
func TestRunFastPathFlagRoutine_FailsClosedOnReadError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
s := &flakyStore{
StoreInterface: newFastPathTestStore(t),
}
require.NoError(t, s.Set(ctx, "peerSyncFastPath", "1"), "seed flag enabled")
flag := RunFastPathFlagRoutine(ctx, s, 50*time.Millisecond, "peerSyncFastPath")
require.NotNil(t, flag)
assert.Eventually(t, flag.Enabled, 2*time.Second, 25*time.Millisecond, "flag should flip enabled while store reads succeed")
s.setGetError(errors.New("simulated transient store failure"))
assert.Eventually(t, func() bool {
return !flag.Enabled()
}, 2*time.Second, 25*time.Millisecond, "flag should flip disabled on store read error (fail-closed)")
s.setGetError(nil)
assert.Eventually(t, flag.Enabled, 2*time.Second, 25*time.Millisecond, "flag should recover once the store read succeeds again")
}
// flakyStore wraps a real store and lets tests inject a transient Get error
// without affecting Set/Delete. Used to exercise fail-closed behaviour.
type flakyStore struct {
store.StoreInterface
getErr atomic.Pointer[error]
}
func (f *flakyStore) Get(ctx context.Context, key any) (any, error) {
if errPtr := f.getErr.Load(); errPtr != nil && *errPtr != nil {
return nil, *errPtr
}
return f.StoreInterface.Get(ctx, key)
}
func (f *flakyStore) setGetError(err error) {
if err == nil {
f.getErr.Store(nil)
return
}
f.getErr.Store(&err)
}

View File

@@ -158,42 +158,77 @@ func (s *Server) tryFastPathSync(
return false, nil
}
return true, s.runFastPathSync(ctx, reqStart, syncStart, accountID, peerKey, realIP, peerMetaHash, srv, unlock)
peer, updates, committed := s.commitFastPath(ctx, accountID, peerKey, realIP, syncStart, network.CurrentSerial())
if !committed {
return false, nil
}
return true, s.runFastPathSync(ctx, reqStart, syncStart, accountID, peerKey, peer, updates, peerMetaHash, srv, unlock)
}
// runFastPathSync executes the fast path: mark connected, send lean response,
// open the update channel, kick off token refresh, release the per-peer lock,
// then block on handleUpdates until the stream is closed.
func (s *Server) runFastPathSync(
// commitFastPath marks the peer connected, subscribes it to network-map
// updates, then re-checks the account serial to close the race between the
// eligibility check and the subscription. If the serial advanced in that
// window the subscription is torn down and the caller falls back to the slow
// path so the peer receives the new state. Returns committed=false on any
// failure that should not block the slow path from running.
func (s *Server) commitFastPath(
ctx context.Context,
reqStart, syncStart time.Time,
accountID string,
peerKey wgtypes.Key,
realIP net.IP,
peerMetaHash uint64,
srv proto.ManagementService_SyncServer,
unlock *func(),
) error {
syncStart time.Time,
expectedSerial uint64,
) (*nbpeer.Peer, chan *network_map.UpdateMessage, bool) {
if err := s.accountManager.MarkPeerConnected(ctx, peerKey.String(), true, realIP, accountID, syncStart); err != nil {
log.WithContext(ctx).Warnf("fast path: mark connected for peer %s: %v", peerKey.String(), err)
}
peer, err := s.accountManager.GetStore().GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerKey.String())
if err != nil {
s.syncSem.Add(-1)
return mapError(ctx, err)
}
if err := s.sendFastPathResponse(ctx, peerKey, peer, srv); err != nil {
log.WithContext(ctx).Debugf("fast path: send response for peer %s: %v", peerKey.String(), err)
s.syncSem.Add(-1)
s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, syncStart)
return err
log.WithContext(ctx).Debugf("fast path: lookup peer %s: %v", peerKey.String(), err)
return nil, nil, false
}
updates, err := s.networkMapController.OnPeerConnected(ctx, accountID, peer.ID)
if err != nil {
log.WithContext(ctx).Debugf("fast path: notify peer connected for %s: %v", peerKey.String(), err)
return nil, nil, false
}
network, err := s.accountManager.GetStore().GetAccountNetwork(ctx, store.LockingStrengthNone, accountID)
if err != nil {
log.WithContext(ctx).Debugf("fast path: re-check account network: %v", err)
s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, syncStart)
return nil, nil, false
}
if network.CurrentSerial() != expectedSerial {
log.WithContext(ctx).Debugf("fast path: serial advanced from %d to %d after subscribe, falling back to slow path for peer %s", expectedSerial, network.CurrentSerial(), peerKey.String())
s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, syncStart)
return nil, nil, false
}
return peer, updates, true
}
// runFastPathSync executes the fast path: send the lean response, kick off
// token refresh, release the per-peer lock, then block on handleUpdates until
// the stream is closed. Peer lookup and subscription have already been
// performed by commitFastPath so the race between eligibility check and
// subscription is already closed.
func (s *Server) runFastPathSync(
ctx context.Context,
reqStart, syncStart time.Time,
accountID string,
peerKey wgtypes.Key,
peer *nbpeer.Peer,
updates chan *network_map.UpdateMessage,
peerMetaHash uint64,
srv proto.ManagementService_SyncServer,
unlock *func(),
) error {
if err := s.sendFastPathResponse(ctx, peerKey, peer, srv); err != nil {
log.WithContext(ctx).Debugf("fast path: send response for peer %s: %v", peerKey.String(), err)
s.syncSem.Add(-1)
s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, syncStart)
return err