Reset connection stage timestamps during reconnections to exclude unnecessary metrics tracking

This commit is contained in:
Zoltán Papp
2026-01-28 15:59:24 +01:00
parent 5169129029
commit cbfde79dd8
4 changed files with 41 additions and 48 deletions

View File

@@ -84,7 +84,7 @@ func (m *victoriaMetrics) RecordConnectionStages(
m.getMetricName("netbird_peer_connection_total_creation_to_handshake", connTypeStr, attemptType),
).Update(totalDuration)
log.Infof("--- Peer connection metrics [%s, %s, %s]: creation→semaphore: %.3fs, semaphore→signaling: %.3fs, signaling→connection: %.3fs, connection→handshake: %.3fs, total: %.3fs",
log.Tracef("peer connection metrics [%s, %s, %s]: creation→semaphore: %.3fs, semaphore→signaling: %.3fs, signaling→connection: %.3fs, connection→handshake: %.3fs, total: %.3fs",
m.deploymentType.String(), connTypeStr, attemptType,
creationToSemaphore, semaphoreToSignaling, signalingToConnection, connectionToHandshake,
totalDuration)

View File

@@ -303,7 +303,6 @@ func (conn *Conn) Close(signalToRemote bool) {
}
conn.setStatusToDisconnected()
conn.opened = false
conn.wg.Wait()
conn.Log.Infof("peer connection closed")
@@ -343,7 +342,6 @@ func (conn *Conn) SetRosenpassInitializedPresharedKeyValidator(handler func(peer
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) {
conn.dumpState.RemoteOffer()
conn.Log.Infof("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay)
conn.handshaker.OnRemoteOffer(offer)
}
@@ -387,7 +385,7 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
if conn.currentConnPriority > priority {
conn.Log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
conn.statusICE.SetConnected()
conn.updateIceState(iceConnInfo)
conn.updateIceState(iceConnInfo, time.Now())
return
}
@@ -428,11 +426,14 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
conn.Log.Infof("configure WireGuard endpoint to: %s", ep.String())
presharedKey := conn.presharedKey(iceConnInfo.RosenpassPubKey)
conn.enableWgWatcherIfNeeded()
updateTime := time.Now()
if err = conn.endpointUpdater.ConfigureWGEndpoint(ep, presharedKey); err != nil {
conn.handleConfigurationFailure(err, wgProxy)
return
}
conn.enableWgWatcherIfNeeded(updateTime)
wgConfigWorkaround()
if conn.wgProxyRelay != nil {
@@ -442,8 +443,8 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
conn.currentConnPriority = priority
conn.statusICE.SetConnected()
conn.updateIceState(iceConnInfo)
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
conn.updateIceState(iceConnInfo, updateTime)
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr, updateTime)
}
func (conn *Conn) onICEStateDisconnected() {
@@ -535,13 +536,13 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
conn.setRelayedProxy(wgProxy)
conn.statusRelay.SetConnected()
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey, time.Now())
return
}
wgProxy.Work()
presharedKey := conn.presharedKey(rci.rosenpassPubKey)
conn.enableWgWatcherIfNeeded()
updateTime := time.Now()
if err := conn.endpointUpdater.ConfigureWGEndpoint(wgProxy.EndpointAddr(), presharedKey); err != nil {
if err := wgProxy.CloseConn(); err != nil {
conn.Log.Warnf("Failed to close relay connection: %v", err)
@@ -550,14 +551,17 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
return
}
conn.enableWgWatcherIfNeeded(updateTime)
wgConfigWorkaround()
conn.rosenpassRemoteKey = rci.rosenpassPubKey
conn.currentConnPriority = conntype.Relay
conn.statusRelay.SetConnected()
conn.setRelayedProxy(wgProxy)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey, updateTime)
conn.Log.Infof("start to communicate with peer via relay")
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr, updateTime)
}
func (conn *Conn) onRelayDisconnected() {
@@ -640,10 +644,10 @@ func (conn *Conn) onWGDisconnected() {
}
}
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte) {
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte, updateTime time.Time) {
peerState := State{
PubKey: conn.config.Key,
ConnStatusUpdate: time.Now(),
ConnStatusUpdate: updateTime,
ConnStatus: conn.evalStatus(),
Relayed: conn.isRelayed(),
RelayServerAddress: relayServerAddr,
@@ -656,10 +660,10 @@ func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []by
}
}
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo) {
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo, updateTime time.Time) {
peerState := State{
PubKey: conn.config.Key,
ConnStatusUpdate: time.Now(),
ConnStatusUpdate: updateTime,
ConnStatus: conn.evalStatus(),
Relayed: iceConnInfo.Relayed,
LocalIceCandidateType: iceConnInfo.LocalIceCandidateType,
@@ -697,12 +701,12 @@ func (conn *Conn) setStatusToDisconnected() {
}
}
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string) {
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string, updateTime time.Time) {
if runtime.GOOS == "ios" {
runtime.GC()
}
conn.metricsStages.RecordConnectionReady()
conn.metricsStages.RecordConnectionReady(updateTime)
if conn.onConnected != nil {
conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.config.WgConfig.AllowedIps[0].Addr().String(), remoteRosenpassAddr)
@@ -768,15 +772,14 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) {
return true
}
func (conn *Conn) enableWgWatcherIfNeeded() {
func (conn *Conn) enableWgWatcherIfNeeded(enabledTime time.Time) {
if !conn.wgWatcher.IsEnabled() {
wgWatcherCtx, wgWatcherCancel := context.WithCancel(conn.ctx)
conn.wgWatcherCancel = wgWatcherCancel
conn.wgWatcherWg.Add(1)
now := time.Now()
go func() {
defer conn.wgWatcherWg.Done()
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, now, conn.onWGDisconnected, conn.onWGHandshakeSuccess)
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, enabledTime, conn.onWGDisconnected, conn.onWGHandshakeSuccess)
}()
}
}
@@ -848,9 +851,7 @@ func (conn *Conn) onWGHandshakeSuccess(when time.Time) {
// recordConnectionMetrics records connection stage timestamps as metrics
func (conn *Conn) recordConnectionMetrics() {
log.Infof("--- record Metrics")
if conn.metricsRecorder == nil {
log.Infof("--- is nil")
return
}
@@ -863,7 +864,6 @@ func (conn *Conn) recordConnectionMetrics() {
connType = metrics.ConnectionTypeICE
}
log.Infof("-- record: connType: %v, %v, %v", connType, conn.metricsStages.IsReconnection(), conn.metricsStages.GetTimestamps())
// Record metrics with timestamps - duration calculation happens in metrics package
conn.metricsRecorder.RecordConnectionStages(
context.Background(),

View File

@@ -4,8 +4,6 @@ import (
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/metrics"
)
@@ -18,14 +16,12 @@ type MetricsStages struct {
func (s *MetricsStages) RecordCreated() {
s.mu.Lock()
defer s.mu.Unlock()
log.Infof("--- RecordCreated")
s.stageTimestamps.Created = time.Now()
}
func (s *MetricsStages) RecordSemaphoreAcquired() {
s.mu.Lock()
defer s.mu.Unlock()
log.Infof("--- RecordSemaphoreAcquired")
s.stageTimestamps.SemaphoreAcquired = time.Now()
}
@@ -35,14 +31,12 @@ func (s *MetricsStages) RecordSemaphoreAcquired() {
func (s *MetricsStages) RecordSignaling() {
s.mu.Lock()
defer s.mu.Unlock()
log.Infof("--- RecordSignaling (send)")
if s.isReconnectionAttempt {
return
}
if s.stageTimestamps.Signaling.IsZero() {
log.Infof("--- Recorded Signaling (initial connection, sending)")
s.stageTimestamps.Signaling = time.Now()
}
}
@@ -53,47 +47,47 @@ func (s *MetricsStages) RecordSignaling() {
func (s *MetricsStages) RecordSignalingReceived() {
s.mu.Lock()
defer s.mu.Unlock()
log.Infof("--- RecordSignalingReceived (receive)")
// Only record for reconnections when we receive a signal
if s.isReconnectionAttempt && s.stageTimestamps.Signaling.IsZero() {
log.Infof("--- Recorded Signaling (reconnection, receiving)")
s.stageTimestamps.Signaling = time.Now()
}
}
func (s *MetricsStages) RecordConnectionReady() {
func (s *MetricsStages) RecordConnectionReady(when time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
log.Infof("--- RecordConnectionReady")
if s.stageTimestamps.ConnectionReady.IsZero() {
log.Infof("--- Recorded ConnectionReady")
s.stageTimestamps.ConnectionReady = time.Now()
s.stageTimestamps.ConnectionReady = when
}
}
func (s *MetricsStages) RecordWGHandshakeSuccess(elapsed time.Time) {
func (s *MetricsStages) RecordWGHandshakeSuccess(handshakeTime time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
log.Infof("--- record: %v, %v", s.stageTimestamps.ConnectionReady, elapsed)
if !s.stageTimestamps.ConnectionReady.IsZero() {
// todo, check if it is earlier then ConnectionReady
s.stageTimestamps.WgHandshakeSuccess = elapsed
// WireGuard only reports handshake times with second precision, but ConnectionReady
// is captured with microsecond precision. If handshake appears before ConnectionReady
// due to truncation (e.g., handshake at 6.042s truncated to 6.000s), normalize to
// ConnectionReady to avoid negative duration metrics.
if handshakeTime.Before(s.stageTimestamps.ConnectionReady) {
s.stageTimestamps.WgHandshakeSuccess = s.stageTimestamps.ConnectionReady
} else {
s.stageTimestamps.WgHandshakeSuccess = handshakeTime
}
}
}
// Disconnected sets the mode to reconnection. It is called only when both ICE and Relay have been disconnected at the same time.
func (s *MetricsStages) Disconnected() {
log.Infof("--- Disconnected")
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
s.stageTimestamps = metrics.ConnectionStageTimestamps{
Created: now,
SemaphoreAcquired: now,
}
// Reset all timestamps for reconnection
// For reconnections, we only track from Signaling onwards
// This avoids meaningless creation→semaphore and semaphore→signaling metrics
s.stageTimestamps = metrics.ConnectionStageTimestamps{}
s.isReconnectionAttempt = true
}

View File

@@ -94,8 +94,7 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn
}
if lastHandshake.IsZero() {
elapsed := calcElapsed(enabledTime, *handshake)
w.log.Infof("--- first wg handshake detected within: %.2fsec, (%s - %s)", elapsed, enabledTime, handshake)
onHandshakeSuccessFn(*handshake)
w.log.Infof("first wg handshake detected within: %.2fsec, (%s)", elapsed, handshake)
}
lastHandshake = *handshake