Add OnPeerConnectedWithPeer to optimize sync fast path operations

This commit is contained in:
mlsmaycon
2026-04-22 22:40:31 +02:00
parent 53deabbdb4
commit 617ceab2e3
4 changed files with 43 additions and 31 deletions

View File

@@ -169,7 +169,7 @@ func (s *Server) tryFastPathSync(
}
log.WithContext(ctx).Debugf("fast path: eligibility check (hit) took %s", time.Since(eligibilityStart))
peer, updates, committed := s.commitFastPath(ctx, accountID, peerKey, realIP, syncStart, network.CurrentSerial())
peer, updates, committed := s.commitFastPath(ctx, accountID, peerKey, realIP, syncStart)
if !committed {
return false, nil
}
@@ -177,21 +177,19 @@ func (s *Server) tryFastPathSync(
return true, s.runFastPathSync(ctx, reqStart, syncStart, accountID, peerKey, peer, updates, peerMetaHash, srv, unlock)
}
// commitFastPath fetches the peer, subscribes it to network-map updates,
// re-checks the account serial to close the race between the eligibility
// check and the subscription, and only then commits MarkPeerConnected. If
// the serial advanced in the race window the update-channel subscription is
// torn down (no MarkPeerConnected is written, so the slow path is free to
// run its own SyncAndMarkPeer cleanly) and the caller falls back to the
// slow path. Returns committed=false on any failure that should not block
// the slow path from running.
// commitFastPath fetches the peer, subscribes it to network-map updates and
// marks the peer connected. It relies on the same eventual-consistency
// guarantee as the slow path: a concurrent writer's broadcast may race the
// subscription, but any subsequent serial change reaches the subscribed peer
// via its update channel, and a reconnect with a stale cached serial falls
// through to the slow path on the next Sync. Returns committed=false on any
// failure that should not block the slow path from running.
func (s *Server) commitFastPath(
ctx context.Context,
accountID string,
peerKey wgtypes.Key,
realIP net.IP,
syncStart time.Time,
expectedSerial uint64,
) (*nbpeer.Peer, chan *network_map.UpdateMessage, bool) {
commitStart := time.Now()
defer func() {
@@ -207,27 +205,12 @@ func (s *Server) commitFastPath(
log.WithContext(ctx).Debugf("fast path: GetPeerByPeerPubKey took %s", time.Since(getPeerStart))
onConnectedStart := time.Now()
updates, err := s.networkMapController.OnPeerConnected(ctx, accountID, peer.ID)
updates, err := s.networkMapController.OnPeerConnectedWithPeer(ctx, accountID, peer)
if err != nil {
log.WithContext(ctx).Debugf("fast path: notify peer connected for %s: %v", peerKey.String(), err)
return nil, nil, false
}
log.WithContext(ctx).Debugf("fast path: OnPeerConnected took %s", time.Since(onConnectedStart))
recheckStart := time.Now()
network, err := s.accountManager.GetStore().GetAccountNetwork(ctx, store.LockingStrengthNone, accountID)
if err != nil {
log.WithContext(ctx).Debugf("fast path: re-check account network: %v", err)
s.networkMapController.OnPeerDisconnected(ctx, accountID, peer.ID)
return nil, nil, false
}
log.WithContext(ctx).Debugf("fast path: re-check GetAccountNetwork took %s", time.Since(recheckStart))
if network.CurrentSerial() != expectedSerial {
log.WithContext(ctx).Debugf("fast path: serial advanced from %d to %d after subscribe, falling back to slow path for peer %s", expectedSerial, network.CurrentSerial(), peerKey.String())
s.networkMapController.OnPeerDisconnected(ctx, accountID, peer.ID)
return nil, nil, false
}
log.WithContext(ctx).Debugf("fast path: OnPeerConnectedWithPeer took %s", time.Since(onConnectedStart))
markStart := time.Now()
if err := s.accountManager.MarkPeerConnected(ctx, peerKey.String(), true, realIP, accountID, syncStart); err != nil {