From 3eb1298cb4ff3d26233315ebd867ed154c17a4e3 Mon Sep 17 00:00:00 2001 From: mlsmaycon Date: Tue, 21 Apr 2026 17:07:31 +0200 Subject: [PATCH] Refactor sync fast path tests and fix CI flakiness - Introduce `skipOnWindows` helper to properly skip tests relying on Unix specific paths. - Replace fixed sleep with `require.Eventually` in `waitForPeerDisconnect` to address flakiness in CI. - Split `commitFastPath` logic out of `runFastPathSync` to close race conditions and improve clarity. - Update tests to leverage new helpers and more precise assertions (e.g., `waitForPeerDisconnect`). - Add `flakyStore` test helper to exercise fail-closed behavior in flag handling. - Enhance `RunFastPathFlagRoutine` to disable the flag on store read errors. --- .../internals/shared/grpc/fast_path_flag.go | 10 ++- .../shared/grpc/fast_path_flag_test.go | 47 ++++++++++++ .../internals/shared/grpc/sync_fast_path.go | 73 ++++++++++++++----- management/server/sync_fast_path_test.go | 61 +++++++++++----- management/server/sync_legacy_wire_test.go | 35 +++++---- 5 files changed, 169 insertions(+), 57 deletions(-) diff --git a/management/internals/shared/grpc/fast_path_flag.go b/management/internals/shared/grpc/fast_path_flag.go index 00b1dc26a..136e37baf 100644 --- a/management/internals/shared/grpc/fast_path_flag.go +++ b/management/internals/shared/grpc/fast_path_flag.go @@ -67,6 +67,9 @@ func (f *FastPathFlag) setEnabled(v bool) { // flag is toggled cluster-wide by writing the key in Redis) and falls back to // an in-process gocache otherwise, which is enough for single-replica dev and // test setups. +// +// The routine fails closed: any store read error (other than a plain "key not +// found" miss) disables the flag until Redis confirms it is enabled again. func RunFastPathFlagRoutine(ctx context.Context, cacheStore store.StoreInterface, interval time.Duration, flagKey string) *FastPathFlag { flag := &FastPathFlag{} @@ -92,11 +95,10 @@ func RunFastPathFlagRoutine(ctx context.Context, cacheStore store.StoreInterface value, err := flagCache.Get(getCtx, flagKey) if err != nil { var notFound *store.NotFound - if errors.As(err, ¬Found) { - flag.setEnabled(false) - return + if !errors.As(err, ¬Found) { + log.Errorf("Sync fast-path flag refresh: %v; disabling fast path", err) } - log.Debugf("Sync fast-path flag refresh: %v", err) + flag.setEnabled(false) return } flag.setEnabled(parseFastPathFlag(value)) diff --git a/management/internals/shared/grpc/fast_path_flag_test.go b/management/internals/shared/grpc/fast_path_flag_test.go index 0846a3f78..d3a21ad64 100644 --- a/management/internals/shared/grpc/fast_path_flag_test.go +++ b/management/internals/shared/grpc/fast_path_flag_test.go @@ -2,6 +2,8 @@ package grpc import ( "context" + "errors" + "sync/atomic" "testing" "time" @@ -127,3 +129,48 @@ func newFastPathTestStore(t *testing.T) store.StoreInterface { t.Helper() return gocache_store.NewGoCache(gocache.New(5*time.Minute, 10*time.Minute)) } + +func TestRunFastPathFlagRoutine_FailsClosedOnReadError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + s := &flakyStore{ + StoreInterface: newFastPathTestStore(t), + } + require.NoError(t, s.Set(ctx, "peerSyncFastPath", "1"), "seed flag enabled") + + flag := RunFastPathFlagRoutine(ctx, s, 50*time.Millisecond, "peerSyncFastPath") + require.NotNil(t, flag) + + assert.Eventually(t, flag.Enabled, 2*time.Second, 25*time.Millisecond, "flag should flip enabled while store reads succeed") + + s.setGetError(errors.New("simulated transient store failure")) + assert.Eventually(t, func() bool { + return !flag.Enabled() + }, 2*time.Second, 25*time.Millisecond, "flag should flip disabled on store read error (fail-closed)") + + s.setGetError(nil) + assert.Eventually(t, flag.Enabled, 2*time.Second, 25*time.Millisecond, "flag should recover once the store read succeeds again") +} + +// flakyStore wraps a real store and lets tests inject a transient Get error +// without affecting Set/Delete. Used to exercise fail-closed behaviour. +type flakyStore struct { + store.StoreInterface + getErr atomic.Pointer[error] +} + +func (f *flakyStore) Get(ctx context.Context, key any) (any, error) { + if errPtr := f.getErr.Load(); errPtr != nil && *errPtr != nil { + return nil, *errPtr + } + return f.StoreInterface.Get(ctx, key) +} + +func (f *flakyStore) setGetError(err error) { + if err == nil { + f.getErr.Store(nil) + return + } + f.getErr.Store(&err) +} diff --git a/management/internals/shared/grpc/sync_fast_path.go b/management/internals/shared/grpc/sync_fast_path.go index aaf0b4ae1..28fde2f11 100644 --- a/management/internals/shared/grpc/sync_fast_path.go +++ b/management/internals/shared/grpc/sync_fast_path.go @@ -158,42 +158,77 @@ func (s *Server) tryFastPathSync( return false, nil } - return true, s.runFastPathSync(ctx, reqStart, syncStart, accountID, peerKey, realIP, peerMetaHash, srv, unlock) + peer, updates, committed := s.commitFastPath(ctx, accountID, peerKey, realIP, syncStart, network.CurrentSerial()) + if !committed { + return false, nil + } + + return true, s.runFastPathSync(ctx, reqStart, syncStart, accountID, peerKey, peer, updates, peerMetaHash, srv, unlock) } -// runFastPathSync executes the fast path: mark connected, send lean response, -// open the update channel, kick off token refresh, release the per-peer lock, -// then block on handleUpdates until the stream is closed. -func (s *Server) runFastPathSync( +// commitFastPath marks the peer connected, subscribes it to network-map +// updates, then re-checks the account serial to close the race between the +// eligibility check and the subscription. If the serial advanced in that +// window the subscription is torn down and the caller falls back to the slow +// path so the peer receives the new state. Returns committed=false on any +// failure that should not block the slow path from running. +func (s *Server) commitFastPath( ctx context.Context, - reqStart, syncStart time.Time, accountID string, peerKey wgtypes.Key, realIP net.IP, - peerMetaHash uint64, - srv proto.ManagementService_SyncServer, - unlock *func(), -) error { + syncStart time.Time, + expectedSerial uint64, +) (*nbpeer.Peer, chan *network_map.UpdateMessage, bool) { if err := s.accountManager.MarkPeerConnected(ctx, peerKey.String(), true, realIP, accountID, syncStart); err != nil { log.WithContext(ctx).Warnf("fast path: mark connected for peer %s: %v", peerKey.String(), err) } peer, err := s.accountManager.GetStore().GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerKey.String()) if err != nil { - s.syncSem.Add(-1) - return mapError(ctx, err) - } - - if err := s.sendFastPathResponse(ctx, peerKey, peer, srv); err != nil { - log.WithContext(ctx).Debugf("fast path: send response for peer %s: %v", peerKey.String(), err) - s.syncSem.Add(-1) - s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, syncStart) - return err + log.WithContext(ctx).Debugf("fast path: lookup peer %s: %v", peerKey.String(), err) + return nil, nil, false } updates, err := s.networkMapController.OnPeerConnected(ctx, accountID, peer.ID) if err != nil { log.WithContext(ctx).Debugf("fast path: notify peer connected for %s: %v", peerKey.String(), err) + return nil, nil, false + } + + network, err := s.accountManager.GetStore().GetAccountNetwork(ctx, store.LockingStrengthNone, accountID) + if err != nil { + log.WithContext(ctx).Debugf("fast path: re-check account network: %v", err) + s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, syncStart) + return nil, nil, false + } + if network.CurrentSerial() != expectedSerial { + log.WithContext(ctx).Debugf("fast path: serial advanced from %d to %d after subscribe, falling back to slow path for peer %s", expectedSerial, network.CurrentSerial(), peerKey.String()) + s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, syncStart) + return nil, nil, false + } + + return peer, updates, true +} + +// runFastPathSync executes the fast path: send the lean response, kick off +// token refresh, release the per-peer lock, then block on handleUpdates until +// the stream is closed. Peer lookup and subscription have already been +// performed by commitFastPath so the race between eligibility check and +// subscription is already closed. +func (s *Server) runFastPathSync( + ctx context.Context, + reqStart, syncStart time.Time, + accountID string, + peerKey wgtypes.Key, + peer *nbpeer.Peer, + updates chan *network_map.UpdateMessage, + peerMetaHash uint64, + srv proto.ManagementService_SyncServer, + unlock *func(), +) error { + if err := s.sendFastPathResponse(ctx, peerKey, peer, srv); err != nil { + log.WithContext(ctx).Debugf("fast path: send response for peer %s: %v", peerKey.String(), err) s.syncSem.Add(-1) s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, syncStart) return err diff --git a/management/server/sync_fast_path_test.go b/management/server/sync_fast_path_test.go index 8a3a40ab1..76e809558 100644 --- a/management/server/sync_fast_path_test.go +++ b/management/server/sync_fast_path_test.go @@ -12,10 +12,21 @@ import ( "github.com/netbirdio/netbird/encryption" "github.com/netbirdio/netbird/management/internals/server/config" + "github.com/netbirdio/netbird/management/server/store" mgmtProto "github.com/netbirdio/netbird/shared/management/proto" "github.com/netbirdio/netbird/util" ) +// skipOnWindows skips the calling test on Windows. The in-process gRPC +// harness uses Unix socket / path conventions that do not cleanly map to +// Windows. +func skipOnWindows(t *testing.T) { + t.Helper() + if runtime.GOOS == "windows" { + t.Skip("skipping on windows; harness uses unix path conventions") + } +} + func fastPathTestConfig(t *testing.T) *config.Config { t.Helper() return &config.Config{ @@ -72,12 +83,21 @@ func openSync(t *testing.T, client mgmtProto.ManagementServiceClient, serverKey, return resp, cancel } -// waitForPeerDisconnect gives the server's handleUpdates goroutine a moment to -// notice the cancelled stream and run cancelPeerRoutines before the next open. -// Without this the new stream can race with the old one's channel close and -// trigger a spurious disconnect. -func waitForPeerDisconnect() { - time.Sleep(50 * time.Millisecond) +// waitForPeerDisconnect polls until the account manager reports the peer as +// disconnected (Status.Connected == false), which happens once the server's +// handleUpdates goroutine has run cancelPeerRoutines for the cancelled +// stream. The deadline is bounded so a stuck server fails the test rather +// than hanging. Replaces the former fixed 50ms sleep which was CI-flaky +// under load or with the race detector on. +func waitForPeerDisconnect(t *testing.T, am *DefaultAccountManager, peerPubKey string) { + t.Helper() + require.Eventually(t, func() bool { + peer, err := am.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey) + if err != nil { + return false + } + return !peer.Status.Connected + }, 2*time.Second, 10*time.Millisecond, "peer %s should be marked disconnected after stream cancel", peerPubKey) } func baseLinuxMeta() *mgmtProto.PeerSystemMeta { @@ -103,9 +123,7 @@ func androidMeta() *mgmtProto.PeerSystemMeta { } func TestSyncFastPath_FirstSync_SendsFullMap(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("skipping on windows; harness uses unix path conventions") - } + skipOnWindows(t) mgmtServer, _, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) require.NoError(t, err) defer cleanup() @@ -128,7 +146,8 @@ func TestSyncFastPath_FirstSync_SendsFullMap(t *testing.T) { } func TestSyncFastPath_SecondSync_MatchingSerial_SkipsMap(t *testing.T) { - mgmtServer, _, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) + skipOnWindows(t) + mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) require.NoError(t, err) defer cleanup() defer mgmtServer.GracefulStop() @@ -145,7 +164,7 @@ func TestSyncFastPath_SecondSync_MatchingSerial_SkipsMap(t *testing.T) { first, cancel1 := openSync(t, client, *serverKey, *keys[0], baseLinuxMeta()) require.NotNil(t, first.NetworkMap, "first sync primes cache with a full map") cancel1() - waitForPeerDisconnect() + waitForPeerDisconnect(t, am, keys[0].PublicKey().String()) second, cancel2 := openSync(t, client, *serverKey, *keys[0], baseLinuxMeta()) defer cancel2() @@ -158,7 +177,8 @@ func TestSyncFastPath_SecondSync_MatchingSerial_SkipsMap(t *testing.T) { } func TestSyncFastPath_AndroidNeverSkips(t *testing.T) { - mgmtServer, _, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) + skipOnWindows(t) + mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) require.NoError(t, err) defer cleanup() defer mgmtServer.GracefulStop() @@ -175,7 +195,7 @@ func TestSyncFastPath_AndroidNeverSkips(t *testing.T) { first, cancel1 := openSync(t, client, *serverKey, *keys[0], androidMeta()) require.NotNil(t, first.NetworkMap, "android first sync must deliver a full map") cancel1() - waitForPeerDisconnect() + waitForPeerDisconnect(t, am, keys[0].PublicKey().String()) second, cancel2 := openSync(t, client, *serverKey, *keys[0], androidMeta()) defer cancel2() @@ -184,7 +204,8 @@ func TestSyncFastPath_AndroidNeverSkips(t *testing.T) { } func TestSyncFastPath_MetaChanged_SendsFullMap(t *testing.T) { - mgmtServer, _, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) + skipOnWindows(t) + mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) require.NoError(t, err) defer cleanup() defer mgmtServer.GracefulStop() @@ -201,7 +222,7 @@ func TestSyncFastPath_MetaChanged_SendsFullMap(t *testing.T) { first, cancel1 := openSync(t, client, *serverKey, *keys[0], baseLinuxMeta()) require.NotNil(t, first.NetworkMap, "first sync primes cache") cancel1() - waitForPeerDisconnect() + waitForPeerDisconnect(t, am, keys[0].PublicKey().String()) changed := baseLinuxMeta() changed.Hostname = "linux-host-renamed" @@ -212,7 +233,8 @@ func TestSyncFastPath_MetaChanged_SendsFullMap(t *testing.T) { } func TestSyncFastPath_LoginInvalidatesCache(t *testing.T) { - mgmtServer, _, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) + skipOnWindows(t) + mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) require.NoError(t, err) defer cleanup() defer mgmtServer.GracefulStop() @@ -233,7 +255,7 @@ func TestSyncFastPath_LoginInvalidatesCache(t *testing.T) { first, cancel1 := openSync(t, client, *serverKey, key, baseLinuxMeta()) require.NotNil(t, first.NetworkMap, "first sync primes cache") cancel1() - waitForPeerDisconnect() + waitForPeerDisconnect(t, am, key.PublicKey().String()) // A subsequent login (e.g. SSH key rotation, re-auth) must clear the cache. _, err = loginPeerWithValidSetupKey(key, client) @@ -245,7 +267,8 @@ func TestSyncFastPath_LoginInvalidatesCache(t *testing.T) { } func TestSyncFastPath_OtherPeerRegistered_ForcesFullMap(t *testing.T) { - mgmtServer, _, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) + skipOnWindows(t) + mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) require.NoError(t, err) defer cleanup() defer mgmtServer.GracefulStop() @@ -262,7 +285,7 @@ func TestSyncFastPath_OtherPeerRegistered_ForcesFullMap(t *testing.T) { first, cancel1 := openSync(t, client, *serverKey, *keys[0], baseLinuxMeta()) require.NotNil(t, first.NetworkMap, "first sync primes cache at serial N") cancel1() - waitForPeerDisconnect() + waitForPeerDisconnect(t, am, keys[0].PublicKey().String()) // Registering another peer bumps the account serial via IncrementNetworkSerial. _, err = registerPeers(1, client) diff --git a/management/server/sync_legacy_wire_test.go b/management/server/sync_legacy_wire_test.go index c6b223897..0fce68f53 100644 --- a/management/server/sync_legacy_wire_test.go +++ b/management/server/sync_legacy_wire_test.go @@ -5,7 +5,6 @@ import ( "os" "path/filepath" "testing" - "time" "github.com/golang/protobuf/proto" //nolint:staticcheck // matches the generator "github.com/stretchr/testify/require" @@ -46,6 +45,7 @@ func sendWireFixture(t *testing.T, client mgmtProto.ManagementServiceClient, ser } func TestSync_WireFixture_LegacyClients_AlwaysReceiveFullMap(t *testing.T) { + skipOnWindows(t) cases := []struct { name string fixture string @@ -87,12 +87,13 @@ func TestSync_WireFixture_LegacyClient_ReconnectStillGetsFullMap(t *testing.T) { // readInitialSettings — they error on nil NetworkMap. Without extra opt-in // signalling there is no way for the server to know this is a GetNetworkMap // call rather than a main Sync, so the server's fast path would break them - // on reconnect. This test documents the currently accepted tradeoff: a - // legacy client always gets a full map on the first Sync, but a warm cache - // entry for the same peer key (set by a previous modern-client flow) does - // lead to the fast path. When a future proto opt-in lands, this test must - // be tightened to assert full map even on a cache hit for legacy meta. - mgmtServer, _, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) + // on reconnect. This test pins the currently accepted tradeoff: a legacy + // v0.40 client gets a full map on the first Sync but a reconnect with an + // unchanged metaHash hits the primed cache and goes through the fast path. + // When a future proto opt-in lets the server distinguish these clients, + // this assertion must be tightened to require.NotNil(second.NetworkMap). + skipOnWindows(t) + mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) require.NoError(t, err) defer cleanup() defer mgmtServer.GracefulStop() @@ -110,16 +111,19 @@ func TestSync_WireFixture_LegacyClient_ReconnectStillGetsFullMap(t *testing.T) { require.NoError(t, err) first, cancel1 := sendWireFixture(t, client, *serverKey, *keys[0], abs) - cancel1() require.NotNil(t, first.NetworkMap, "first legacy sync receives full map and primes cache") + cancel1() + waitForPeerDisconnect(t, am, keys[0].PublicKey().String()) - // Give server-side handleUpdates time to tear down the first stream before - // we reopen for the same peer. - time.Sleep(50 * time.Millisecond) + second, cancel2 := sendWireFixture(t, client, *serverKey, *keys[0], abs) + defer cancel2() + require.Nil(t, second.NetworkMap, "documented legacy-reconnect tradeoff: warm cache entry causes fast path; update when proto opt-in lands") + require.NotNil(t, second.NetbirdConfig, "fast path still delivers NetbirdConfig") } func TestSync_WireFixture_AndroidReconnect_NeverSkips(t *testing.T) { - mgmtServer, _, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) + skipOnWindows(t) + mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) require.NoError(t, err) defer cleanup() defer mgmtServer.GracefulStop() @@ -139,7 +143,7 @@ func TestSync_WireFixture_AndroidReconnect_NeverSkips(t *testing.T) { first, cancel1 := sendWireFixture(t, client, *serverKey, *keys[0], abs) require.NotNil(t, first.NetworkMap, "android first sync must deliver a full map") cancel1() - waitForPeerDisconnect() + waitForPeerDisconnect(t, am, keys[0].PublicKey().String()) second, cancel2 := sendWireFixture(t, client, *serverKey, *keys[0], abs) defer cancel2() @@ -147,7 +151,8 @@ func TestSync_WireFixture_AndroidReconnect_NeverSkips(t *testing.T) { } func TestSync_WireFixture_ModernClientReconnect_TakesFastPath(t *testing.T) { - mgmtServer, _, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) + skipOnWindows(t) + mgmtServer, am, addr, cleanup, err := startManagementForTest(t, "testdata/store_with_expired_peers.sql", fastPathTestConfig(t)) require.NoError(t, err) defer cleanup() defer mgmtServer.GracefulStop() @@ -167,7 +172,7 @@ func TestSync_WireFixture_ModernClientReconnect_TakesFastPath(t *testing.T) { first, cancel1 := sendWireFixture(t, client, *serverKey, *keys[0], abs) require.NotNil(t, first.NetworkMap, "modern first sync primes cache") cancel1() - waitForPeerDisconnect() + waitForPeerDisconnect(t, am, keys[0].PublicKey().String()) second, cancel2 := sendWireFixture(t, client, *serverKey, *keys[0], abs) defer cancel2()