diff --git a/management/internals/shared/grpc/sync_fast_path.go b/management/internals/shared/grpc/sync_fast_path.go index 5487d5c05..fe339bda4 100644 --- a/management/internals/shared/grpc/sync_fast_path.go +++ b/management/internals/shared/grpc/sync_fast_path.go @@ -106,18 +106,19 @@ func buildFastPathResponse( nbConfig := toNetbirdConfig(cfg, turnToken, relayToken, extraSettings) var peerGroups []string - start := time.Now() if fetchGroups != nil { + start := time.Now() if ids, err := fetchGroups(ctx, peer.AccountID, peer.ID); err != nil { log.WithContext(ctx).Debugf("fast path: get peer group ids: %v", err) } else { peerGroups = ids } - log.WithContext(ctx).Debugf("fast path: get peer group took %s", time.Since(start)) + log.WithContext(ctx).Debugf("fast path: get peer groups took %s", time.Since(start)) } - start = time.Now() + + extendStart := time.Now() nbConfig = integrationsConfig.ExtendNetBirdConfig(peer.ID, peerGroups, nbConfig, extraSettings) - log.WithContext(ctx).Debugf("fast path: ExtendNetBirdConfig took %s", time.Since(start)) + log.WithContext(ctx).Debugf("fast path: ExtendNetBirdConfig took %s", time.Since(extendStart)) return &proto.SyncResponse{NetbirdConfig: nbConfig} } @@ -140,7 +141,6 @@ func (s *Server) tryFastPathSync( srv proto.ManagementService_SyncServer, unlock *func(), ) (took bool, err error) { - start := time.Now() if s.peerSerialCache == nil { return false, nil } @@ -151,22 +151,21 @@ func (s *Server) tryFastPathSync( return false, nil } + networkStart := time.Now() network, err := s.accountManager.GetStore().GetAccountNetwork(ctx, store.LockingStrengthNone, accountID) if err != nil { log.WithContext(ctx).Debugf("fast path: lookup account network: %v", err) return false, nil } + log.WithContext(ctx).Debugf("fast path: initial GetAccountNetwork took %s", time.Since(networkStart)) - log.WithContext(ctx).Debugf("fast path: GetAccountNetwork peer took %s", time.Since(start)) - start = time.Now() - + eligibilityStart := time.Now() cached, hit := s.peerSerialCache.Get(peerKey.String()) if !shouldSkipNetworkMap(peerMeta.GoOS, hit, cached, network.CurrentSerial(), peerMetaHash) { - log.WithContext(ctx).Debugf("fast path: checking for fast path eligibility for peer took %s", time.Since(reqStart)) + log.WithContext(ctx).Debugf("fast path: eligibility check (miss) took %s", time.Since(eligibilityStart)) return false, nil } - - log.WithContext(ctx).Debugf("fast path: checking for fast path eligibility for peer took %s", time.Since(start)) + log.WithContext(ctx).Debugf("fast path: eligibility check (hit) took %s", time.Since(eligibilityStart)) peer, updates, committed := s.commitFastPath(ctx, accountID, peerKey, realIP, syncStart, network.CurrentSerial()) if !committed { @@ -176,12 +175,14 @@ func (s *Server) tryFastPathSync( return true, s.runFastPathSync(ctx, reqStart, syncStart, accountID, peerKey, peer, updates, peerMetaHash, srv, unlock) } -// commitFastPath marks the peer connected, subscribes it to network-map -// updates, then re-checks the account serial to close the race between the -// eligibility check and the subscription. If the serial advanced in that -// window the subscription is torn down and the caller falls back to the slow -// path so the peer receives the new state. Returns committed=false on any -// failure that should not block the slow path from running. +// commitFastPath fetches the peer, subscribes it to network-map updates, +// re-checks the account serial to close the race between the eligibility +// check and the subscription, and only then commits MarkPeerConnected. If +// the serial advanced in the race window the update-channel subscription is +// torn down (no MarkPeerConnected is written, so the slow path is free to +// run its own SyncAndMarkPeer cleanly) and the caller falls back to the +// slow path. Returns committed=false on any failure that should not block +// the slow path from running. func (s *Server) commitFastPath( ctx context.Context, accountID string, @@ -190,50 +191,48 @@ func (s *Server) commitFastPath( syncStart time.Time, expectedSerial uint64, ) (*nbpeer.Peer, chan *network_map.UpdateMessage, bool) { - start := time.Now() - cp := start - defer func() { log.WithContext(ctx).Debugf("fast path: commitFastPath took %s", time.Since(cp)) }() - - if err := s.accountManager.MarkPeerConnected(ctx, peerKey.String(), true, realIP, accountID, syncStart); err != nil { - log.WithContext(ctx).Warnf("fast path: mark connected for peer %s: %v", peerKey.String(), err) - } - - log.WithContext(ctx).Debugf("fast path: mark peer connected took %s", time.Since(start)) - start = time.Now() + commitStart := time.Now() + defer func() { + log.WithContext(ctx).Debugf("fast path: commitFastPath took %s", time.Since(commitStart)) + }() + getPeerStart := time.Now() peer, err := s.accountManager.GetStore().GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerKey.String()) if err != nil { log.WithContext(ctx).Debugf("fast path: lookup peer %s: %v", peerKey.String(), err) return nil, nil, false } + log.WithContext(ctx).Debugf("fast path: GetPeerByPeerPubKey took %s", time.Since(getPeerStart)) - log.WithContext(ctx).Debugf("fast path: get peer took %s", time.Since(start)) - start = time.Now() - + onConnectedStart := time.Now() updates, err := s.networkMapController.OnPeerConnected(ctx, accountID, peer.ID) if err != nil { log.WithContext(ctx).Debugf("fast path: notify peer connected for %s: %v", peerKey.String(), err) return nil, nil, false } + log.WithContext(ctx).Debugf("fast path: OnPeerConnected took %s", time.Since(onConnectedStart)) - log.WithContext(ctx).Debugf("fast path: on peer connected took %s", time.Since(start)) - start = time.Now() - + recheckStart := time.Now() network, err := s.accountManager.GetStore().GetAccountNetwork(ctx, store.LockingStrengthNone, accountID) if err != nil { log.WithContext(ctx).Debugf("fast path: re-check account network: %v", err) - s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, syncStart) + s.networkMapController.OnPeerDisconnected(ctx, accountID, peer.ID) return nil, nil, false } - - log.WithContext(ctx).Debugf("fast path: get network took %s", time.Since(start)) + log.WithContext(ctx).Debugf("fast path: re-check GetAccountNetwork took %s", time.Since(recheckStart)) if network.CurrentSerial() != expectedSerial { log.WithContext(ctx).Debugf("fast path: serial advanced from %d to %d after subscribe, falling back to slow path for peer %s", expectedSerial, network.CurrentSerial(), peerKey.String()) - s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, syncStart) + s.networkMapController.OnPeerDisconnected(ctx, accountID, peer.ID) return nil, nil, false } + markStart := time.Now() + if err := s.accountManager.MarkPeerConnected(ctx, peerKey.String(), true, realIP, accountID, syncStart); err != nil { + log.WithContext(ctx).Warnf("fast path: mark connected for peer %s: %v", peerKey.String(), err) + } + log.WithContext(ctx).Debugf("fast path: MarkPeerConnected took %s", time.Since(markStart)) + return peer, updates, true } @@ -253,15 +252,14 @@ func (s *Server) runFastPathSync( srv proto.ManagementService_SyncServer, unlock *func(), ) error { - start := time.Now() + sendStart := time.Now() if err := s.sendFastPathResponse(ctx, peerKey, peer, srv); err != nil { log.WithContext(ctx).Debugf("fast path: send response for peer %s: %v", peerKey.String(), err) s.syncSem.Add(-1) s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, syncStart) return err } - - log.WithContext(ctx).Debugf("fast path: mark peer connected took %s", time.Since(start)) + log.WithContext(ctx).Debugf("fast path: sendFastPathResponse took %s", time.Since(sendStart)) s.secretsManager.SetupRefresh(ctx, accountID, peer.ID) @@ -273,7 +271,6 @@ func (s *Server) runFastPathSync( if s.appMetrics != nil { s.appMetrics.GRPCMetrics().CountSyncRequestDuration(time.Since(reqStart), accountID) } - log.WithContext(ctx).Debugf("fast path: sync took %s", time.Since(start)) log.WithContext(ctx).Debugf("Sync (fast path) took %s", time.Since(reqStart)) s.syncSem.Add(-1)