diff --git a/management/internals/controllers/network_map/controller/controller.go b/management/internals/controllers/network_map/controller/controller.go index 4b414df6f..83a922cc3 100644 --- a/management/internals/controllers/network_map/controller/controller.go +++ b/management/internals/controllers/network_map/controller/controller.go @@ -132,9 +132,18 @@ func (c *Controller) OnPeerConnected(ctx context.Context, accountID string, peer return nil, fmt.Errorf("failed to get peer %s: %v", peerID, err) } - c.EphemeralPeersManager.OnPeerConnected(ctx, peer) + return c.OnPeerConnectedWithPeer(ctx, accountID, peer) +} - return c.peersUpdateManager.CreateChannel(ctx, peerID), nil +// OnPeerConnectedWithPeer is the peer-object variant of OnPeerConnected. It +// skips the internal GetPeerByID and is intended for callers that already +// hold the peer (e.g. the Sync fast path). The accountID parameter is kept +// for symmetry with OnPeerConnected even though the peer object already +// carries it — callers typically have it handy from the surrounding context. +func (c *Controller) OnPeerConnectedWithPeer(ctx context.Context, accountID string, peer *nbpeer.Peer) (chan *network_map.UpdateMessage, error) { + _ = accountID + c.EphemeralPeersManager.OnPeerConnected(ctx, peer) + return c.peersUpdateManager.CreateChannel(ctx, peer.ID), nil } func (c *Controller) OnPeerDisconnected(ctx context.Context, accountID string, peerID string) { diff --git a/management/internals/controllers/network_map/interface.go b/management/internals/controllers/network_map/interface.go index 64caac861..58bc76acd 100644 --- a/management/internals/controllers/network_map/interface.go +++ b/management/internals/controllers/network_map/interface.go @@ -35,6 +35,11 @@ type Controller interface { OnPeersDeleted(ctx context.Context, accountID string, peerIDs []string) error DisconnectPeers(ctx context.Context, accountId string, peerIDs []string) OnPeerConnected(ctx context.Context, accountID string, peerID string) (chan *UpdateMessage, error) + // OnPeerConnectedWithPeer is equivalent to OnPeerConnected but accepts an + // already-fetched peer, skipping the internal GetPeerByID lookup. Intended + // for callers that have already resolved the peer (e.g. the Sync fast path) + // so the controller does not re-read what the caller just read. + OnPeerConnectedWithPeer(ctx context.Context, accountID string, peer *nbpeer.Peer) (chan *UpdateMessage, error) OnPeerDisconnected(ctx context.Context, accountID string, peerID string) TrackEphemeralPeer(ctx context.Context, peer *nbpeer.Peer) diff --git a/management/internals/controllers/network_map/interface_mock.go b/management/internals/controllers/network_map/interface_mock.go index 4e86d2973..88e87262a 100644 --- a/management/internals/controllers/network_map/interface_mock.go +++ b/management/internals/controllers/network_map/interface_mock.go @@ -1,9 +1,9 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: management/internals/controllers/network_map/interface.go +// Source: ./interface.go // // Generated by this command: // -// mockgen -package network_map -destination=management/internals/controllers/network_map/interface_mock.go -source=management/internals/controllers/network_map/interface.go -build_flags=-mod=mod +// mockgen -package network_map -destination=interface_mock.go -source=./interface.go -build_flags=-mod=mod // // Package network_map is a generated GoMock package. @@ -145,6 +145,21 @@ func (mr *MockControllerMockRecorder) OnPeerConnected(ctx, accountID, peerID any return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnPeerConnected", reflect.TypeOf((*MockController)(nil).OnPeerConnected), ctx, accountID, peerID) } +// OnPeerConnectedWithPeer mocks base method. +func (m *MockController) OnPeerConnectedWithPeer(ctx context.Context, accountID string, arg2 *peer.Peer) (chan *UpdateMessage, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OnPeerConnectedWithPeer", ctx, accountID, arg2) + ret0, _ := ret[0].(chan *UpdateMessage) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// OnPeerConnectedWithPeer indicates an expected call of OnPeerConnectedWithPeer. +func (mr *MockControllerMockRecorder) OnPeerConnectedWithPeer(ctx, accountID, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnPeerConnectedWithPeer", reflect.TypeOf((*MockController)(nil).OnPeerConnectedWithPeer), ctx, accountID, arg2) +} + // OnPeerDisconnected mocks base method. func (m *MockController) OnPeerDisconnected(ctx context.Context, accountID, peerID string) { m.ctrl.T.Helper() diff --git a/management/internals/shared/grpc/sync_fast_path.go b/management/internals/shared/grpc/sync_fast_path.go index 354e8defe..01adb850c 100644 --- a/management/internals/shared/grpc/sync_fast_path.go +++ b/management/internals/shared/grpc/sync_fast_path.go @@ -169,7 +169,7 @@ func (s *Server) tryFastPathSync( } log.WithContext(ctx).Debugf("fast path: eligibility check (hit) took %s", time.Since(eligibilityStart)) - peer, updates, committed := s.commitFastPath(ctx, accountID, peerKey, realIP, syncStart, network.CurrentSerial()) + peer, updates, committed := s.commitFastPath(ctx, accountID, peerKey, realIP, syncStart) if !committed { return false, nil } @@ -177,21 +177,19 @@ func (s *Server) tryFastPathSync( return true, s.runFastPathSync(ctx, reqStart, syncStart, accountID, peerKey, peer, updates, peerMetaHash, srv, unlock) } -// commitFastPath fetches the peer, subscribes it to network-map updates, -// re-checks the account serial to close the race between the eligibility -// check and the subscription, and only then commits MarkPeerConnected. If -// the serial advanced in the race window the update-channel subscription is -// torn down (no MarkPeerConnected is written, so the slow path is free to -// run its own SyncAndMarkPeer cleanly) and the caller falls back to the -// slow path. Returns committed=false on any failure that should not block -// the slow path from running. +// commitFastPath fetches the peer, subscribes it to network-map updates and +// marks the peer connected. It relies on the same eventual-consistency +// guarantee as the slow path: a concurrent writer's broadcast may race the +// subscription, but any subsequent serial change reaches the subscribed peer +// via its update channel, and a reconnect with a stale cached serial falls +// through to the slow path on the next Sync. Returns committed=false on any +// failure that should not block the slow path from running. func (s *Server) commitFastPath( ctx context.Context, accountID string, peerKey wgtypes.Key, realIP net.IP, syncStart time.Time, - expectedSerial uint64, ) (*nbpeer.Peer, chan *network_map.UpdateMessage, bool) { commitStart := time.Now() defer func() { @@ -207,27 +205,12 @@ func (s *Server) commitFastPath( log.WithContext(ctx).Debugf("fast path: GetPeerByPeerPubKey took %s", time.Since(getPeerStart)) onConnectedStart := time.Now() - updates, err := s.networkMapController.OnPeerConnected(ctx, accountID, peer.ID) + updates, err := s.networkMapController.OnPeerConnectedWithPeer(ctx, accountID, peer) if err != nil { log.WithContext(ctx).Debugf("fast path: notify peer connected for %s: %v", peerKey.String(), err) return nil, nil, false } - log.WithContext(ctx).Debugf("fast path: OnPeerConnected took %s", time.Since(onConnectedStart)) - - recheckStart := time.Now() - network, err := s.accountManager.GetStore().GetAccountNetwork(ctx, store.LockingStrengthNone, accountID) - if err != nil { - log.WithContext(ctx).Debugf("fast path: re-check account network: %v", err) - s.networkMapController.OnPeerDisconnected(ctx, accountID, peer.ID) - return nil, nil, false - } - log.WithContext(ctx).Debugf("fast path: re-check GetAccountNetwork took %s", time.Since(recheckStart)) - - if network.CurrentSerial() != expectedSerial { - log.WithContext(ctx).Debugf("fast path: serial advanced from %d to %d after subscribe, falling back to slow path for peer %s", expectedSerial, network.CurrentSerial(), peerKey.String()) - s.networkMapController.OnPeerDisconnected(ctx, accountID, peer.ID) - return nil, nil, false - } + log.WithContext(ctx).Debugf("fast path: OnPeerConnectedWithPeer took %s", time.Since(onConnectedStart)) markStart := time.Now() if err := s.accountManager.MarkPeerConnected(ctx, peerKey.String(), true, realIP, accountID, syncStart); err != nil {