fix(proxy): wire PR #6207 snapshot-apply metrics

The SyncMappings restore in 036e91cde kept the metric definitions
(RecordSnapshotSyncDuration, RecordSnapshotBatchDuration,
RecordAddPeerDuration) and the corresponding callbacks (OnAddPeer)
but lost their call sites — they shipped as dead code.

- proxy/server.go: introduce snapshotTracker (the type PR #6207 added
  to share batch/sync timing between handleMappingStream and
  handleSyncMappingsStream); both stream handlers now go through it.
- proxy/internal/roundtrip/netbird.go: add OnAddPeer struct field and
  invoke it after createClientEntry with the per-call duration.
- proxy/server.go: wire s.netbird.OnAddPeer = s.meter.RecordAddPeerDuration
  alongside the existing NetBird construction.

No new test coverage — PR #6207's bench tests already exercise the
batch/sync paths and continue to pass.
This commit is contained in:
mlsmaycon
2026-05-21 11:25:56 +02:00
parent dd90c0d180
commit 06cc488e90
2 changed files with 67 additions and 44 deletions

View File

@@ -170,6 +170,11 @@ type NetBird struct {
// stopHandler runs when an account's last service is removed (or the
// transport is shutting down). Receives whatever readyHandler returned.
stopHandler func(accountID types.AccountID, state any)
// OnAddPeer, when set, is called after AddPeer completes for a new account
// (i.e. when a new client was actually created, not when an existing one
// was reused). The duration covers keygen + gRPC CreateProxyPeer + embed.New.
OnAddPeer func(d time.Duration, err error)
}
// ClientDebugInfo contains debug information about a client.
@@ -217,7 +222,11 @@ func (n *NetBird) AddPeer(ctx context.Context, accountID types.AccountID, key Se
return nil
}
createStart := time.Now()
entry, err := n.createClientEntry(ctx, accountID, key, authToken, si)
if n.OnAddPeer != nil {
n.OnAddPeer(time.Since(createStart), err)
}
if err != nil {
n.clientsMux.Unlock()
return err

View File

@@ -534,6 +534,7 @@ func (s *Server) initNetBirdClient() {
// the embedded client never accepts inbound, so block.
BlockInbound: !s.Private,
}, s.Logger, s, s.mgmtClient)
s.netbird.OnAddPeer = s.meter.RecordAddPeerDuration
}
// initReverseProxy builds the meter-instrumented reverse proxy. MultiTransport
@@ -1273,17 +1274,14 @@ func isSyncUnimplemented(err error) bool {
// stream, sending an ack after each batch is fully processed. Management waits
// for the ack before sending the next batch, providing application-level
// back-pressure.
func (s *Server) handleSyncMappingsStream(ctx context.Context, stream proto.ProxyService_SyncMappingsClient, initialSyncDone *bool, _ time.Time) error {
func (s *Server) handleSyncMappingsStream(ctx context.Context, stream proto.ProxyService_SyncMappingsClient, initialSyncDone *bool, connectTime time.Time) error {
select {
case <-s.routerReady:
case <-ctx.Done():
return ctx.Err()
}
var snapshotIDs map[types.ServiceID]struct{}
if !*initialSyncDone {
snapshotIDs = make(map[types.ServiceID]struct{})
}
tracker := s.newSnapshotTracker(initialSyncDone, connectTime)
for {
select {
@@ -1297,24 +1295,12 @@ func (s *Server) handleSyncMappingsStream(ctx context.Context, stream proto.Prox
case err != nil:
return fmt.Errorf("receive msg: %w", err)
}
batchStart := time.Now()
s.Logger.Debug("Received mapping update, starting processing")
s.processMappings(ctx, msg.GetMapping())
s.Logger.Debug("Processing mapping update completed")
if !*initialSyncDone {
for _, m := range msg.GetMapping() {
snapshotIDs[types.ServiceID(m.GetId())] = struct{}{}
}
if msg.GetInitialSyncComplete() {
s.reconcileSnapshot(ctx, snapshotIDs)
snapshotIDs = nil
if s.healthChecker != nil {
s.healthChecker.SetInitialSyncComplete()
}
*initialSyncDone = true
s.Logger.Info("Initial mapping sync complete")
}
}
tracker.recordBatch(ctx, s, msg.GetMapping(), msg.GetInitialSyncComplete(), batchStart)
if err := stream.Send(&proto.SyncMappingsRequest{
Msg: &proto.SyncMappingsRequest_Ack{Ack: &proto.SyncMappingsAck{}},
@@ -1325,52 +1311,80 @@ func (s *Server) handleSyncMappingsStream(ctx context.Context, stream proto.Prox
}
}
func (s *Server) handleMappingStream(ctx context.Context, mappingClient proto.ProxyService_GetMappingUpdateClient, initialSyncDone *bool, _ time.Time) error {
// snapshotTracker accumulates service IDs during the initial snapshot and
// finalises sync state when the complete flag arrives. Used by both
// handleMappingStream and handleSyncMappingsStream so metric emission and
// reconciliation behave identically on either RPC.
type snapshotTracker struct {
done *bool
connectTime time.Time
snapshotIDs map[types.ServiceID]struct{}
}
func (s *Server) newSnapshotTracker(done *bool, connectTime time.Time) *snapshotTracker {
var ids map[types.ServiceID]struct{}
if !*done {
ids = make(map[types.ServiceID]struct{})
}
return &snapshotTracker{done: done, connectTime: connectTime, snapshotIDs: ids}
}
func (t *snapshotTracker) recordBatch(ctx context.Context, s *Server, mappings []*proto.ProxyMapping, syncComplete bool, batchStart time.Time) {
if *t.done {
return
}
if s.meter != nil {
s.meter.RecordSnapshotBatchDuration(time.Since(batchStart))
}
for _, m := range mappings {
t.snapshotIDs[types.ServiceID(m.GetId())] = struct{}{}
}
if !syncComplete {
return
}
s.reconcileSnapshot(ctx, t.snapshotIDs)
t.snapshotIDs = nil
if s.healthChecker != nil {
s.healthChecker.SetInitialSyncComplete()
}
*t.done = true
if s.meter != nil {
s.meter.RecordSnapshotSyncDuration(time.Since(t.connectTime))
}
s.Logger.Info("Initial mapping sync complete")
}
func (s *Server) handleMappingStream(ctx context.Context, mappingClient proto.ProxyService_GetMappingUpdateClient, initialSyncDone *bool, connectTime time.Time) error {
select {
case <-s.routerReady:
case <-ctx.Done():
return ctx.Err()
}
var snapshotIDs map[types.ServiceID]struct{}
if !*initialSyncDone {
snapshotIDs = make(map[types.ServiceID]struct{})
}
tracker := s.newSnapshotTracker(initialSyncDone, connectTime)
for {
// Check for context completion to gracefully shutdown.
select {
case <-ctx.Done():
// Shutting down.
return ctx.Err()
default:
msg, err := mappingClient.Recv()
switch {
case errors.Is(err, io.EOF):
// Mapping connection gracefully terminated by server.
return nil
case err != nil:
// Something has gone horribly wrong, return and hope the parent retries the connection.
return fmt.Errorf("receive msg: %w", err)
}
batchStart := time.Now()
s.Logger.Debug("Received mapping update, starting processing")
s.processMappings(ctx, msg.GetMapping())
s.Logger.Debug("Processing mapping update completed")
if !*initialSyncDone {
for _, m := range msg.GetMapping() {
snapshotIDs[types.ServiceID(m.GetId())] = struct{}{}
}
if msg.GetInitialSyncComplete() {
s.reconcileSnapshot(ctx, snapshotIDs)
snapshotIDs = nil
if s.healthChecker != nil {
s.healthChecker.SetInitialSyncComplete()
}
*initialSyncDone = true
s.Logger.Info("Initial mapping sync complete")
}
}
tracker.recordBatch(ctx, s, msg.GetMapping(), msg.GetInitialSyncComplete(), batchStart)
}
}
}