diff --git a/client/internal/connect.go b/client/internal/connect.go index 7ca3d8f49..dad2e6b6b 100644 --- a/client/internal/connect.go +++ b/client/internal/connect.go @@ -22,7 +22,6 @@ import ( "github.com/netbirdio/netbird/client/iface/device" "github.com/netbirdio/netbird/client/internal/dns" "github.com/netbirdio/netbird/client/internal/listener" - "github.com/netbirdio/netbird/client/internal/metrics" "github.com/netbirdio/netbird/client/internal/peer" "github.com/netbirdio/netbird/client/internal/profilemanager" "github.com/netbirdio/netbird/client/internal/statemanager" @@ -53,7 +52,6 @@ type ConnectClient struct { engineMutex sync.Mutex persistSyncResponse bool - clientMetrics *metrics.ClientMetrics } func NewConnectClient( @@ -61,7 +59,6 @@ func NewConnectClient( config *profilemanager.Config, statusRecorder *peer.Status, doInitalAutoUpdate bool, - clientMetrics *metrics.ClientMetrics, ) *ConnectClient { return &ConnectClient{ ctx: ctx, @@ -69,7 +66,6 @@ func NewConnectClient( statusRecorder: statusRecorder, doInitialAutoUpdate: doInitalAutoUpdate, engineMutex: sync.Mutex{}, - clientMetrics: clientMetrics, } } @@ -311,7 +307,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan checks := loginResp.GetChecks() c.engineMutex.Lock() - engine := NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks, stateManager, c.clientMetrics) + engine := NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks, stateManager) engine.SetSyncResponsePersistence(c.persistSyncResponse) c.engine = engine c.engineMutex.Unlock() diff --git a/client/internal/engine.go b/client/internal/engine.go index 439058afc..7e6a933f4 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -228,7 +228,13 @@ type localIpUpdater interface { } // NewEngine creates a new Connection Engine with probes attached -func NewEngine(clientCtx context.Context, clientCancel context.CancelFunc, signalClient signal.Client, mgmClient mgm.Client, relayManager *relayClient.Manager, config *EngineConfig, mobileDep MobileDependency, statusRecorder *peer.Status, checks []*mgmProto.Checks, stateManager *statemanager.Manager, clientMetrics *metrics.ClientMetrics) *Engine { +func NewEngine(clientCtx context.Context, clientCancel context.CancelFunc, signalClient signal.Client, mgmClient mgm.Client, relayManager *relayClient.Manager, config *EngineConfig, mobileDep MobileDependency, statusRecorder *peer.Status, checks []*mgmProto.Checks, stateManager *statemanager.Manager) *Engine { + // Initialize metrics based on deployment type + var deploymentType metrics.DeploymentType + if mgmClient != nil { + deploymentType = metrics.DetermineDeploymentType(mgmClient.GetServerURL()) + } + engine := &Engine{ clientCtx: clientCtx, clientCancel: clientCancel, @@ -248,7 +254,7 @@ func NewEngine(clientCtx context.Context, clientCancel context.CancelFunc, signa checks: checks, connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit), probeStunTurn: relay.NewStunTurnProbe(relay.DefaultCacheTTL), - clientMetrics: clientMetrics, + clientMetrics: metrics.NewClientMetrics(deploymentType, true), } log.Infof("I am: %s", config.WgPrivateKey.PublicKey().String()) @@ -294,11 +300,6 @@ func (e *Engine) Stop() error { e.updateManager.Stop() } - // Update metrics engine status - if e.clientMetrics != nil { - e.clientMetrics.SetEngineStatus(0) // 0=stopped - } - log.Info("cleaning up status recorder states") e.statusRecorder.ReplaceOfflinePeers([]peer.State{}) e.statusRecorder.UpdateDNSStates([]peer.NSGroupState{}) @@ -529,11 +530,6 @@ func (e *Engine) Start(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL) } }() - // Update metrics engine status - if e.clientMetrics != nil { - e.clientMetrics.SetEngineStatus(1) // 1=running - } - return nil } @@ -1400,12 +1396,13 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix, agentV } serviceDependencies := peer.ServiceDependencies{ - StatusRecorder: e.statusRecorder, - Signaler: e.signaler, - IFaceDiscover: e.mobileDep.IFaceDiscover, - RelayManager: e.relayManager, - SrWatcher: e.srWatcher, - Semaphore: e.connSemaphore, + StatusRecorder: e.statusRecorder, + Signaler: e.signaler, + IFaceDiscover: e.mobileDep.IFaceDiscover, + RelayManager: e.relayManager, + SrWatcher: e.srWatcher, + Semaphore: e.connSemaphore, + MetricsRecorder: e.clientMetrics, } peerConn, err := peer.NewConn(config, serviceDependencies) if err != nil { @@ -1689,6 +1686,11 @@ func (e *Engine) GetFirewallManager() firewallManager.Manager { return e.firewall } +// GetClientMetrics returns the client metrics +func (e *Engine) GetClientMetrics() *metrics.ClientMetrics { + return e.clientMetrics +} + func findIPFromInterfaceName(ifaceName string) (net.IP, error) { iface, err := net.InterfaceByName(ifaceName) if err != nil { diff --git a/client/internal/metrics/connection_type.go b/client/internal/metrics/connection_type.go new file mode 100644 index 000000000..a3406a6b8 --- /dev/null +++ b/client/internal/metrics/connection_type.go @@ -0,0 +1,17 @@ +package metrics + +// ConnectionType represents the type of peer connection +type ConnectionType string + +const ( + // ConnectionTypeICE represents a direct peer-to-peer connection using ICE + ConnectionTypeICE ConnectionType = "ice" + + // ConnectionTypeRelay represents a relayed connection + ConnectionTypeRelay ConnectionType = "relay" +) + +// String returns the string representation of the connection type +func (c ConnectionType) String() string { + return string(c) +} diff --git a/client/internal/metrics/deployment_type.go b/client/internal/metrics/deployment_type.go new file mode 100644 index 000000000..e1d9e8d53 --- /dev/null +++ b/client/internal/metrics/deployment_type.go @@ -0,0 +1,46 @@ +package metrics + +import ( + "strings" +) + +// DeploymentType represents the type of NetBird deployment +type DeploymentType int + +const ( + // DeploymentTypeUnknown represents an unknown or uninitialized deployment type + DeploymentTypeUnknown DeploymentType = iota + + // DeploymentTypeCloud represents a cloud-hosted NetBird deployment + DeploymentTypeCloud + + // DeploymentTypeSelfHosted represents a self-hosted NetBird deployment + DeploymentTypeSelfHosted +) + +// String returns the string representation of the deployment type +func (d DeploymentType) String() string { + switch d { + case DeploymentTypeCloud: + return "cloud" + case DeploymentTypeSelfHosted: + return "selfhosted" + default: + return "selfhosted" + } +} + +// DetermineDeploymentType determines if the deployment is cloud or self-hosted +// based on the management URL string +func DetermineDeploymentType(managementURL string) DeploymentType { + if managementURL == "" { + return DeploymentTypeUnknown + } + + // Check for NetBird cloud API domain + if strings.Contains(strings.ToLower(managementURL), "api.netbird.io") { + return DeploymentTypeCloud + } + + return DeploymentTypeSelfHosted +} diff --git a/client/internal/metrics/metrics.go b/client/internal/metrics/metrics.go index b1322c532..787ffc1ac 100644 --- a/client/internal/metrics/metrics.go +++ b/client/internal/metrics/metrics.go @@ -1,32 +1,62 @@ package metrics import ( + "context" "io" - - "github.com/VictoriaMetrics/metrics" + "time" ) -// ClientMetrics holds all client-side metrics +// metricsImplementation defines the internal interface for metrics implementations +type metricsImplementation interface { + // RecordConnectionStages records connection stage metrics from timestamps + RecordConnectionStages( + ctx context.Context, + connectionType ConnectionType, + isReconnection bool, + timestamps ConnectionStageTimestamps, + ) + + // Export exports metrics in Prometheus format + Export(w io.Writer) error +} + type ClientMetrics struct { - // ICE negotiation metrics - iceNegotiationDuration *metrics.Histogram + impl metricsImplementation +} + +// ConnectionStageTimestamps holds timestamps for each connection stage +type ConnectionStageTimestamps struct { + Created time.Time + SemaphoreAcquired time.Time + Signaling time.Time // First signal sent (initial) or signal received (reconnection) + ConnectionReady time.Time + WgHandshakeSuccess time.Time } // NewClientMetrics creates a new ClientMetrics instance -func NewClientMetrics() *ClientMetrics { - return &ClientMetrics{ - // ICE negotiation metrics - iceNegotiationDuration: metrics.NewHistogram(`netbird_client_ice_negotiation_duration_seconds`), +// If enabled is true, uses an OpenTelemetry implementation +// If enabled is false, uses a no-op implementation +func NewClientMetrics(deploymentType DeploymentType, enabled bool) *ClientMetrics { + var impl metricsImplementation + if !enabled { + impl = &noopMetrics{} + } else { + impl = newOtelMetrics(deploymentType) } + return &ClientMetrics{impl: impl} } -// RecordICENegotiationDuration records the time taken for ICE negotiation -func (m *ClientMetrics) RecordICENegotiationDuration(seconds float64) { - m.iceNegotiationDuration.Update(seconds) +// RecordConnectionStages calculates stage durations from timestamps and records them +func (c *ClientMetrics) RecordConnectionStages( + ctx context.Context, + connectionType ConnectionType, + isReconnection bool, + timestamps ConnectionStageTimestamps, +) { + c.impl.RecordConnectionStages(ctx, connectionType, isReconnection, timestamps) } -// Export writes all metrics in Prometheus format to the provided writer -func (m *ClientMetrics) Export(w io.Writer) error { - metrics.WritePrometheus(w, true) - return nil +// Export exports metrics to the writer +func (c *ClientMetrics) Export(w io.Writer) error { + return c.impl.Export(w) } diff --git a/client/internal/metrics/noop.go b/client/internal/metrics/noop.go new file mode 100644 index 000000000..bf8aa4320 --- /dev/null +++ b/client/internal/metrics/noop.go @@ -0,0 +1,22 @@ +package metrics + +import ( + "context" + "io" +) + +// noopMetrics is a no-op implementation of metricsImplementation +type noopMetrics struct{} + +func (s *noopMetrics) RecordConnectionStages( + _ context.Context, + _ ConnectionType, + _ bool, + _ ConnectionStageTimestamps, +) { + // No-op +} + +func (s *noopMetrics) Export(_ io.Writer) error { + return nil +} diff --git a/client/internal/metrics/otel.go b/client/internal/metrics/otel.go new file mode 100644 index 000000000..43bb12232 --- /dev/null +++ b/client/internal/metrics/otel.go @@ -0,0 +1,250 @@ +package metrics + +import ( + "context" + "fmt" + "io" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// otelMetrics is the OpenTelemetry implementation of ClientMetrics +type otelMetrics struct { + reader *sdkmetric.ManualReader + meterProvider *sdkmetric.MeterProvider + meter metric.Meter + + // Static attributes applied to all metrics + deploymentType DeploymentType + + // Connection stage duration histograms + stageCreationToSemaphore metric.Float64Histogram + stageSemaphoreToSignaling metric.Float64Histogram + stageSignalingToConnection metric.Float64Histogram + stageConnectionToHandshake metric.Float64Histogram + stageTotalCreationToHandshake metric.Float64Histogram +} + +func newOtelMetrics(deploymentType DeploymentType) metricsImplementation { + reader := sdkmetric.NewManualReader() + meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + + otel.SetMeterProvider(meterProvider) + + meter := meterProvider.Meter("netbird.client") + + stageCreationToSemaphore, err := meter.Float64Histogram( + "netbird.peer.connection.stage.creation_to_semaphore", + metric.WithDescription("Duration from connection creation to semaphore acquisition"), + metric.WithUnit("s"), + ) + if err != nil { + return &noopMetrics{} + } + + stageSemaphoreToSignaling, err := meter.Float64Histogram( + "netbird.peer.connection.stage.semaphore_to_signaling", + metric.WithDescription("Duration from semaphore acquisition to signaling start"), + metric.WithUnit("s"), + ) + if err != nil { + return &noopMetrics{} + } + + stageSignalingToConnection, err := meter.Float64Histogram( + "netbird.peer.connection.stage.signaling_to_connection", + metric.WithDescription("Duration from signaling start to connection ready"), + metric.WithUnit("s"), + ) + if err != nil { + return &noopMetrics{} + } + + stageConnectionToHandshake, err := meter.Float64Histogram( + "netbird.peer.connection.stage.connection_to_handshake", + metric.WithDescription("Duration from connection ready to WireGuard handshake success"), + metric.WithUnit("s"), + ) + if err != nil { + return &noopMetrics{} + } + + stageTotalCreationToHandshake, err := meter.Float64Histogram( + "netbird.peer.connection.total.creation_to_handshake", + metric.WithDescription("Total duration from connection creation to WireGuard handshake success"), + metric.WithUnit("s"), + ) + if err != nil { + return &noopMetrics{} + } + + return &otelMetrics{ + reader: reader, + meterProvider: meterProvider, + meter: meter, + deploymentType: deploymentType, + stageCreationToSemaphore: stageCreationToSemaphore, + stageSemaphoreToSignaling: stageSemaphoreToSignaling, + stageSignalingToConnection: stageSignalingToConnection, + stageConnectionToHandshake: stageConnectionToHandshake, + stageTotalCreationToHandshake: stageTotalCreationToHandshake, + } +} + +// RecordConnectionStages records the duration of each connection stage from timestamps +func (m *otelMetrics) RecordConnectionStages( + ctx context.Context, + connectionType ConnectionType, + isReconnection bool, + timestamps ConnectionStageTimestamps, +) { + // Calculate stage durations + var creationToSemaphore, semaphoreToSignaling, signalingToConnection, connectionToHandshake, totalDuration float64 + + if !timestamps.Created.IsZero() && !timestamps.SemaphoreAcquired.IsZero() { + creationToSemaphore = timestamps.SemaphoreAcquired.Sub(timestamps.Created).Seconds() + } + + if !timestamps.SemaphoreAcquired.IsZero() && !timestamps.Signaling.IsZero() { + semaphoreToSignaling = timestamps.Signaling.Sub(timestamps.SemaphoreAcquired).Seconds() + } + + if !timestamps.Signaling.IsZero() && !timestamps.ConnectionReady.IsZero() { + signalingToConnection = timestamps.ConnectionReady.Sub(timestamps.Signaling).Seconds() + } + + if !timestamps.ConnectionReady.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() { + connectionToHandshake = timestamps.WgHandshakeSuccess.Sub(timestamps.ConnectionReady).Seconds() + } + + if !timestamps.Created.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() { + totalDuration = timestamps.WgHandshakeSuccess.Sub(timestamps.Created).Seconds() + } + + // Determine attempt type + attemptType := "initial" + if isReconnection { + attemptType = "reconnection" + } + + // Combine deployment type, connection type, and attempt type attributes + attrs := metric.WithAttributes( + attribute.String("deployment_type", m.deploymentType.String()), + attribute.String("connection_type", connectionType.String()), + attribute.String("attempt_type", attemptType), + ) + + m.stageCreationToSemaphore.Record(ctx, creationToSemaphore, attrs) + m.stageSemaphoreToSignaling.Record(ctx, semaphoreToSignaling, attrs) + m.stageSignalingToConnection.Record(ctx, signalingToConnection, attrs) + m.stageConnectionToHandshake.Record(ctx, connectionToHandshake, attrs) + m.stageTotalCreationToHandshake.Record(ctx, totalDuration, attrs) +} + +// Export writes metrics in Prometheus text format +func (m *otelMetrics) Export(w io.Writer) error { + if m.reader == nil { + return fmt.Errorf("metrics reader not initialized") + } + + // Collect current metrics + var rm metricdata.ResourceMetrics + if err := m.reader.Collect(context.Background(), &rm); err != nil { + return fmt.Errorf("failed to collect metrics: %w", err) + } + + // Iterate through scope metrics and write in Prometheus format + for _, sm := range rm.ScopeMetrics { + for _, m := range sm.Metrics { + // Write HELP line + if _, err := fmt.Fprintf(w, "# HELP %s %s\n", m.Name, m.Description); err != nil { + return err + } + + // Write TYPE line + if _, err := fmt.Fprintf(w, "# TYPE %s histogram\n", m.Name); err != nil { + return err + } + + // Handle histogram data + if hist, ok := m.Data.(metricdata.Histogram[float64]); ok { + for _, dp := range hist.DataPoints { + // Build label string from attributes + labelStr := "" + if len(dp.Attributes.ToSlice()) > 0 { + labels := "" + for _, attr := range dp.Attributes.ToSlice() { + if labels != "" { + labels += "," + } + labels += fmt.Sprintf("%s=\"%s\"", attr.Key, attr.Value.AsString()) + } + labelStr = labels + } + + // Write bucket counts + cumulativeCount := uint64(0) + for i, bound := range dp.Bounds { + cumulativeCount += dp.BucketCounts[i] + bucketLabel := labelStr + if bucketLabel != "" { + bucketLabel += "," + } + bucketLabel += fmt.Sprintf("le=\"%g\"", bound) + if _, err := fmt.Fprintf(w, "%s_bucket{%s} %d\n", + m.Name, bucketLabel, cumulativeCount); err != nil { + return err + } + } + + // Write +Inf bucket (last bucket count) + if len(dp.BucketCounts) > len(dp.Bounds) { + cumulativeCount += dp.BucketCounts[len(dp.BucketCounts)-1] + } + bucketLabel := labelStr + if bucketLabel != "" { + bucketLabel += "," + } + bucketLabel += "le=\"+Inf\"" + if _, err := fmt.Fprintf(w, "%s_bucket{%s} %d\n", + m.Name, bucketLabel, cumulativeCount); err != nil { + return err + } + + // Write sum + if labelStr != "" { + if _, err := fmt.Fprintf(w, "%s_sum{%s} %g\n", m.Name, labelStr, dp.Sum); err != nil { + return err + } + } else { + if _, err := fmt.Fprintf(w, "%s_sum %g\n", m.Name, dp.Sum); err != nil { + return err + } + } + + // Write count + if labelStr != "" { + if _, err := fmt.Fprintf(w, "%s_count{%s} %d\n", m.Name, labelStr, dp.Count); err != nil { + return err + } + } else { + if _, err := fmt.Fprintf(w, "%s_count %d\n", m.Name, dp.Count); err != nil { + return err + } + } + } + } + + // Empty line between metrics + if _, err := fmt.Fprintf(w, "\n"); err != nil { + return err + } + } + } + + return nil +} diff --git a/client/internal/metrics/victoria.go b/client/internal/metrics/victoria.go new file mode 100644 index 000000000..0ea953003 --- /dev/null +++ b/client/internal/metrics/victoria.go @@ -0,0 +1,106 @@ +package metrics + +import ( + "context" + "fmt" + "io" + + "github.com/VictoriaMetrics/metrics" +) + +// victoriaMetrics is the VictoriaMetrics implementation of ClientMetrics +type victoriaMetrics struct { + // Static attributes applied to all metrics + deploymentType DeploymentType + + // Metrics set for managing all metrics + set *metrics.Set +} + +func newVictoriaMetrics(deploymentType DeploymentType) metricsImplementation { + return &victoriaMetrics{ + deploymentType: deploymentType, + set: metrics.NewSet(), + } +} + +// RecordConnectionStages records the duration of each connection stage from timestamps +func (m *victoriaMetrics) RecordConnectionStages( + ctx context.Context, + connectionType ConnectionType, + isReconnection bool, + timestamps ConnectionStageTimestamps, +) { + // Calculate stage durations + var creationToSemaphore, semaphoreToSignaling, signalingToConnection, connectionToHandshake, totalDuration float64 + + if !timestamps.Created.IsZero() && !timestamps.SemaphoreAcquired.IsZero() { + creationToSemaphore = timestamps.SemaphoreAcquired.Sub(timestamps.Created).Seconds() + } + + if !timestamps.SemaphoreAcquired.IsZero() && !timestamps.Signaling.IsZero() { + semaphoreToSignaling = timestamps.Signaling.Sub(timestamps.SemaphoreAcquired).Seconds() + } + + if !timestamps.Signaling.IsZero() && !timestamps.ConnectionReady.IsZero() { + signalingToConnection = timestamps.ConnectionReady.Sub(timestamps.Signaling).Seconds() + } + + if !timestamps.ConnectionReady.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() { + connectionToHandshake = timestamps.WgHandshakeSuccess.Sub(timestamps.ConnectionReady).Seconds() + } + + if !timestamps.Created.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() { + totalDuration = timestamps.WgHandshakeSuccess.Sub(timestamps.Created).Seconds() + } + + // Determine attempt type + attemptType := "initial" + if isReconnection { + attemptType = "reconnection" + } + + connTypeStr := connectionType.String() + + // Record observations using histograms + m.set.GetOrCreateHistogram( + m.getMetricName("netbird_peer_connection_stage_creation_to_semaphore", connTypeStr, attemptType), + ).Update(creationToSemaphore) + + m.set.GetOrCreateHistogram( + m.getMetricName("netbird_peer_connection_stage_semaphore_to_signaling", connTypeStr, attemptType), + ).Update(semaphoreToSignaling) + + m.set.GetOrCreateHistogram( + m.getMetricName("netbird_peer_connection_stage_signaling_to_connection", connTypeStr, attemptType), + ).Update(signalingToConnection) + + m.set.GetOrCreateHistogram( + m.getMetricName("netbird_peer_connection_stage_connection_to_handshake", connTypeStr, attemptType), + ).Update(connectionToHandshake) + + m.set.GetOrCreateHistogram( + m.getMetricName("netbird_peer_connection_total_creation_to_handshake", connTypeStr, attemptType), + ).Update(totalDuration) +} + +// getMetricName constructs a metric name with labels +func (m *victoriaMetrics) getMetricName(baseName, connectionType, attemptType string) string { + return fmt.Sprintf(`%s{deployment_type=%q,connection_type=%q,attempt_type=%q}`, + baseName, + m.deploymentType.String(), + connectionType, + attemptType, + ) +} + +// Export writes metrics in Prometheus text format +func (m *victoriaMetrics) Export(w io.Writer) error { + if m.set == nil { + return fmt.Errorf("metrics set not initialized") + } + + // Write metrics in Prometheus format + m.set.WritePrometheus(w) + return nil +} diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 20a2eb342..0efd95918 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -16,6 +16,7 @@ import ( "github.com/netbirdio/netbird/client/iface/configurer" "github.com/netbirdio/netbird/client/iface/wgproxy" + "github.com/netbirdio/netbird/client/internal/metrics" "github.com/netbirdio/netbird/client/internal/peer/conntype" "github.com/netbirdio/netbird/client/internal/peer/dispatcher" "github.com/netbirdio/netbird/client/internal/peer/guard" @@ -28,6 +29,16 @@ import ( semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group" ) +// MetricsRecorder is an interface for recording peer connection metrics +type MetricsRecorder interface { + RecordConnectionStages( + ctx context.Context, + connectionType metrics.ConnectionType, + isReconnection bool, + timestamps metrics.ConnectionStageTimestamps, + ) +} + type ServiceDependencies struct { StatusRecorder *Status Signaler *Signaler @@ -36,6 +47,7 @@ type ServiceDependencies struct { SrWatcher *guard.SRWatcher Semaphore *semaphoregroup.SemaphoreGroup PeerConnDispatcher *dispatcher.ConnectionDispatcher + MetricsRecorder MetricsRecorder } type WgConfig struct { @@ -98,6 +110,7 @@ type Conn struct { workerICE *WorkerICE workerRelay *WorkerRelay + wgWatcher *WGWatcher wgWatcherWg sync.WaitGroup // used to store the remote Rosenpass key for Relayed connection in case of connection update from ice @@ -115,6 +128,13 @@ type Conn struct { dumpState *stateDump endpointUpdater *EndpointUpdater + + // Connection stage timestamps for metrics + metricsRecorder MetricsRecorder + hasBeenConnected bool // Track if we've ever established a successful connection + isReconnectionAttempt bool // Track if current attempt is a reconnection + stageTimestamps metrics.ConnectionStageTimestamps + stagesMutex sync.Mutex } // NewConn creates a new not opened Conn to the remote peer. @@ -126,6 +146,7 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) { connLog := log.WithField("peer", config.Key) + dumpState := newStateDump(config.Key, connLog, services.StatusRecorder) var conn = &Conn{ Log: connLog, config: config, @@ -137,10 +158,13 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) { semaphore: services.Semaphore, statusRelay: worker.NewAtomicStatus(), statusICE: worker.NewAtomicStatus(), - dumpState: newStateDump(config.Key, connLog, services.StatusRecorder), + dumpState: dumpState, endpointUpdater: NewEndpointUpdater(connLog, config.WgConfig, isController(config)), + metricsRecorder: services.MetricsRecorder, } + conn.wgWatcher = NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState, conn.onWGHandshakeSuccess) + return conn, nil } @@ -148,10 +172,21 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) { // It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will // be used. func (conn *Conn) Open(engineCtx context.Context) error { + // Record the start time - beginning of connection attempt + conn.stagesMutex.Lock() + conn.stageTimestamps.Created = time.Now() + conn.stagesMutex.Unlock() + + // Semaphore.Add() blocks here until there's a free slot if err := conn.semaphore.Add(engineCtx); err != nil { return err } + // Record when semaphore was acquired (after the wait) + conn.stagesMutex.Lock() + conn.stageTimestamps.SemaphoreAcquired = time.Now() + conn.stagesMutex.Unlock() + conn.mu.Lock() defer conn.mu.Unlock() @@ -231,7 +266,7 @@ func (conn *Conn) Close(signalToRemote bool) { conn.Log.Infof("close peer connection") conn.ctxCancel() - conn.workerRelay.DisableWgWatcher() + conn.wgWatcher.DisableWgWatcher() conn.workerRelay.CloseConn() conn.workerICE.Close() @@ -260,6 +295,14 @@ func (conn *Conn) Close(signalToRemote bool) { } conn.setStatusToDisconnected() + + // Reset connection metrics state + conn.stagesMutex.Lock() + conn.hasBeenConnected = false + conn.isReconnectionAttempt = false + conn.stageTimestamps = metrics.ConnectionStageTimestamps{} + conn.stagesMutex.Unlock() + conn.opened = false conn.wg.Wait() conn.Log.Infof("peer connection closed") @@ -292,6 +335,22 @@ func (conn *Conn) SetOnDisconnected(handler func(remotePeer string)) { func (conn *Conn) OnRemoteOffer(offer OfferAnswer) { conn.dumpState.RemoteOffer() conn.Log.Infof("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay) + + conn.stagesMutex.Lock() + // Detect reconnection: we had been connected before, but now both ICE and Relay are disconnected + if conn.hasBeenConnected && conn.evalStatus() != StatusConnected { + conn.isReconnectionAttempt = true + conn.stageTimestamps = metrics.ConnectionStageTimestamps{} // Reset for new reconnection attempt + now := time.Now() + conn.stageTimestamps.Created = now + conn.stageTimestamps.Signaling = now + conn.Log.Infof("Reconnection triggered by remote offer") + } else if conn.stageTimestamps.Signaling.IsZero() { + // First time receiving offer for this connection attempt (signaling start) + conn.stageTimestamps.Signaling = time.Now() + } + conn.stagesMutex.Unlock() + conn.handshaker.OnRemoteOffer(offer) } @@ -366,7 +425,7 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn ep = directEp } - conn.workerRelay.DisableWgWatcher() + conn.wgWatcher.DisableWgWatcher() // todo consider to run conn.wgWatcherWg.Wait() here if conn.wgProxyRelay != nil { @@ -390,6 +449,12 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn conn.wgProxyRelay.RedirectAs(ep) } + conn.wgWatcherWg.Add(1) + go func() { + defer conn.wgWatcherWg.Done() + conn.wgWatcher.EnableWgWatcher(conn.ctx, nil) + }() + conn.currentConnPriority = priority conn.statusICE.SetConnected() conn.updateIceState(iceConnInfo) @@ -423,15 +488,17 @@ func (conn *Conn) onICEStateDisconnected() { conn.Log.Errorf("failed to switch to relay conn: %v", err) } + conn.wgWatcher.DisableWgWatcher() conn.wgWatcherWg.Add(1) go func() { defer conn.wgWatcherWg.Done() - conn.workerRelay.EnableWgWatcher(conn.ctx) + conn.wgWatcher.EnableWgWatcher(conn.ctx, conn.onWGDisconnected) }() conn.wgProxyRelay.Work() conn.currentConnPriority = conntype.Relay } else { conn.Log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", conntype.None.String()) + conn.wgWatcher.DisableWgWatcher() conn.currentConnPriority = conntype.None if err := conn.config.WgConfig.WgInterface.RemoveEndpointAddress(conn.config.WgConfig.RemoteKey); err != nil { conn.Log.Errorf("failed to remove wg endpoint: %v", err) @@ -500,10 +567,11 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) { return } + conn.wgWatcher.DisableWgWatcher() conn.wgWatcherWg.Add(1) go func() { defer conn.wgWatcherWg.Done() - conn.workerRelay.EnableWgWatcher(conn.ctx) + conn.wgWatcher.EnableWgWatcher(conn.ctx, conn.onWGDisconnected) }() wgConfigWorkaround() @@ -560,7 +628,15 @@ func (conn *Conn) onGuardEvent() { conn.dumpState.SendOffer() if err := conn.handshaker.SendOffer(); err != nil { conn.Log.Errorf("failed to send offer: %v", err) + return } + + // Record signaling start timestamp (first signal sent) + conn.stagesMutex.Lock() + if conn.stageTimestamps.Signaling.IsZero() { + conn.stageTimestamps.Signaling = time.Now() + } + conn.stagesMutex.Unlock() } func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte) { @@ -625,6 +701,20 @@ func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAdd runtime.GC() } + // Record connection ready timestamp and mark as connected + conn.stagesMutex.Lock() + if conn.stageTimestamps.ConnectionReady.IsZero() { + conn.stageTimestamps.ConnectionReady = time.Now() + } + // Mark that we've established a connection + conn.hasBeenConnected = true + + // todo: remove this when fixed the wireguard watcher + conn.stageTimestamps.WgHandshakeSuccess = time.Now() + conn.recordConnectionMetrics() + + conn.stagesMutex.Unlock() + if conn.onConnected != nil { conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.config.WgConfig.AllowedIps[0].Addr().String(), remoteRosenpassAddr) } @@ -734,6 +824,50 @@ func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) { conn.wgProxyRelay = proxy } +// onWGDisconnected is called when the WireGuard handshake times out +func (conn *Conn) onWGDisconnected() { + conn.workerRelay.CloseConn() + conn.onRelayDisconnected() +} + +// onWGHandshakeSuccess is called when the first WireGuard handshake is detected +func (conn *Conn) onWGHandshakeSuccess() { + conn.stagesMutex.Lock() + defer conn.stagesMutex.Unlock() + + /* + if conn.stageTimestamps.WgHandshakeSuccess.IsZero() { + conn.stageTimestamps.WgHandshakeSuccess = time.Now() + conn.recordConnectionMetrics() + } + + */ +} + +// recordConnectionMetrics records connection stage timestamps as metrics +func (conn *Conn) recordConnectionMetrics() { + if conn.metricsRecorder == nil { + return + } + + // Determine connection type based on current priority + var connType metrics.ConnectionType + switch conn.currentConnPriority { + case conntype.Relay: + connType = metrics.ConnectionTypeRelay + default: + connType = metrics.ConnectionTypeICE + } + + // Record metrics with timestamps - duration calculation happens in metrics package + conn.metricsRecorder.RecordConnectionStages( + context.Background(), + connType, + conn.isReconnectionAttempt, + conn.stageTimestamps, + ) +} + // AllowedIP returns the allowed IP of the remote peer func (conn *Conn) AllowedIP() netip.Addr { return conn.config.WgConfig.AllowedIps[0].Addr() diff --git a/client/internal/peer/wg_watcher.go b/client/internal/peer/wg_watcher.go index 0ed200fda..39241d596 100644 --- a/client/internal/peer/wg_watcher.go +++ b/client/internal/peer/wg_watcher.go @@ -34,14 +34,17 @@ type WGWatcher struct { ctxCancel context.CancelFunc ctxLock sync.Mutex enabledTime time.Time + + onFirstHandshakeFn func() } -func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string, stateDump *stateDump) *WGWatcher { +func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string, stateDump *stateDump, onFirstHandshakeFn func()) *WGWatcher { return &WGWatcher{ - log: log, - wgIfaceStater: wgIfaceStater, - peerKey: peerKey, - stateDump: stateDump, + log: log, + wgIfaceStater: wgIfaceStater, + peerKey: peerKey, + stateDump: stateDump, + onFirstHandshakeFn: onFirstHandshakeFn, } } @@ -100,12 +103,17 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel contex case <-timer.C: handshake, ok := w.handshakeCheck(lastHandshake) if !ok { - onDisconnectedFn() + if onDisconnectedFn != nil { + onDisconnectedFn() + } return } if lastHandshake.IsZero() { - elapsed := handshake.Sub(w.enabledTime).Seconds() + elapsed := w.calcElapsed(handshake) w.log.Infof("first wg handshake detected within: %.2fsec, (%s)", elapsed, handshake) + if w.onFirstHandshakeFn != nil { + w.onFirstHandshakeFn() + } } lastHandshake = *handshake @@ -134,19 +142,19 @@ func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) { // the current know handshake did not change if handshake.Equal(lastHandshake) { - w.log.Warnf("WireGuard handshake timed out, closing relay connection: %v", handshake) + w.log.Warnf("WireGuard handshake timed out: %v", handshake) return nil, false } // in case if the machine is suspended, the handshake time will be in the past if handshake.Add(checkPeriod).Before(time.Now()) { - w.log.Warnf("WireGuard handshake timed out, closing relay connection: %v", handshake) + w.log.Warnf("WireGuard handshake timed out: %v", handshake) return nil, false } // error handling for handshake time in the future if handshake.After(time.Now()) { - w.log.Warnf("WireGuard handshake is in the future, closing relay connection: %v", handshake) + w.log.Warnf("WireGuard handshake is in the future: %v", handshake) return nil, false } @@ -164,3 +172,13 @@ func (w *WGWatcher) wgState() (time.Time, error) { } return wgState.LastHandshake, nil } + +// calcElapsed calculates elapsed time since watcher was enabled. +// The watcher started after the wg configuration happens, because of this need to normalise the negative value +func (w *WGWatcher) calcElapsed(handshake *time.Time) float64 { + elapsed := handshake.Sub(w.enabledTime).Seconds() + if elapsed < 0 { + elapsed = 0 + } + return elapsed +} diff --git a/client/internal/peer/wg_watcher_test.go b/client/internal/peer/wg_watcher_test.go index d7c277eff..3442fa786 100644 --- a/client/internal/peer/wg_watcher_test.go +++ b/client/internal/peer/wg_watcher_test.go @@ -28,7 +28,7 @@ func TestWGWatcher_EnableWgWatcher(t *testing.T) { mlog := log.WithField("peer", "tet") mocWgIface := &MocWgIface{} - watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump("peer", mlog, &Status{})) + watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump("peer", mlog, &Status{}), nil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -57,7 +57,7 @@ func TestWGWatcher_ReEnable(t *testing.T) { mlog := log.WithField("peer", "tet") mocWgIface := &MocWgIface{} - watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump("peer", mlog, &Status{})) + watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump("peer", mlog, &Status{}), nil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/client/internal/peer/worker_relay.go b/client/internal/peer/worker_relay.go index f584487f5..a9fb72d2b 100644 --- a/client/internal/peer/worker_relay.go +++ b/client/internal/peer/worker_relay.go @@ -30,8 +30,6 @@ type WorkerRelay struct { relayLock sync.Mutex relaySupportedOnRemotePeer atomic.Bool - - wgWatcher *WGWatcher } func NewWorkerRelay(ctx context.Context, log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager *relayClient.Manager, stateDump *stateDump) *WorkerRelay { @@ -42,7 +40,6 @@ func NewWorkerRelay(ctx context.Context, log *log.Entry, ctrl bool, config ConnC config: config, conn: conn, relayManager: relayManager, - wgWatcher: NewWGWatcher(log, config.WgConfig.WgInterface, config.Key, stateDump), } return r } @@ -93,14 +90,6 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) { }) } -func (w *WorkerRelay) EnableWgWatcher(ctx context.Context) { - w.wgWatcher.EnableWgWatcher(ctx, w.onWGDisconnected) -} - -func (w *WorkerRelay) DisableWgWatcher() { - w.wgWatcher.DisableWgWatcher() -} - func (w *WorkerRelay) RelayInstanceAddress() (string, error) { return w.relayManager.RelayInstanceAddress() } @@ -125,14 +114,6 @@ func (w *WorkerRelay) CloseConn() { } } -func (w *WorkerRelay) onWGDisconnected() { - w.relayLock.Lock() - _ = w.relayedConn.Close() - w.relayLock.Unlock() - - w.conn.onRelayDisconnected() -} - func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool { if !w.relayManager.HasRelayAddress() { return false @@ -148,6 +129,5 @@ func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress st } func (w *WorkerRelay) onRelayClientDisconnected() { - w.wgWatcher.DisableWgWatcher() go w.conn.onRelayDisconnected() } diff --git a/client/server/debug.go b/client/server/debug.go index 056d9df21..a6e5926e9 100644 --- a/client/server/debug.go +++ b/client/server/debug.go @@ -32,12 +32,20 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) ( log.Warnf("failed to get latest sync response: %v", err) } + var clientMetrics debug.MetricsExporter + if s.connectClient != nil { + if engine := s.connectClient.Engine(); engine != nil { + clientMetrics = engine.GetClientMetrics() + } + } + bundleGenerator := debug.NewBundleGenerator( debug.GeneratorDependencies{ InternalConfig: s.config, StatusRecorder: s.statusRecorder, SyncResponse: syncResponse, LogFile: s.logFile, + ClientMetrics: clientMetrics, }, debug.BundleConfig{ Anonymize: req.GetAnonymize(), diff --git a/client/server/server.go b/client/server/server.go index 66dca5cd2..7b6c4e98c 100644 --- a/client/server/server.go +++ b/client/server/server.go @@ -24,7 +24,6 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" "github.com/netbirdio/netbird/client/internal/auth" - "github.com/netbirdio/netbird/client/internal/metrics" "github.com/netbirdio/netbird/client/internal/profilemanager" "github.com/netbirdio/netbird/client/system" mgm "github.com/netbirdio/netbird/shared/management/client" @@ -77,7 +76,6 @@ type Server struct { statusRecorder *peer.Status sessionWatcher *internal.SessionWatcher - clientMetrics *metrics.ClientMetrics lastProbe time.Time persistSyncResponse bool @@ -111,7 +109,6 @@ func New(ctx context.Context, logFile string, configFile string, profilesDisable profilesDisabled: profilesDisabled, updateSettingsDisabled: updateSettingsDisabled, jwtCache: newJWTCache(), - clientMetrics: metrics.NewClientMetrics(), } } @@ -1527,7 +1524,7 @@ func (s *Server) GetFeatures(ctx context.Context, msg *proto.GetFeaturesRequest) func (s *Server) connect(ctx context.Context, config *profilemanager.Config, statusRecorder *peer.Status, doInitialAutoUpdate bool, runningChan chan struct{}) error { log.Tracef("running client connection") - s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder, doInitialAutoUpdate, s.clientMetrics) + s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder, doInitialAutoUpdate) s.connectClient.SetSyncResponsePersistence(s.persistSyncResponse) if err := s.connectClient.Run(runningChan); err != nil { return err diff --git a/shared/management/client/client.go b/shared/management/client/client.go index 3126bcd1f..8a89010eb 100644 --- a/shared/management/client/client.go +++ b/shared/management/client/client.go @@ -20,6 +20,7 @@ type Client interface { GetDeviceAuthorizationFlow(serverKey wgtypes.Key) (*proto.DeviceAuthorizationFlow, error) GetPKCEAuthorizationFlow(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error) GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, error) + GetServerURL() string IsHealthy() bool SyncMeta(sysInfo *system.Info) error Logout() error diff --git a/shared/management/client/grpc.go b/shared/management/client/grpc.go index 89860ac9b..a030f1371 100644 --- a/shared/management/client/grpc.go +++ b/shared/management/client/grpc.go @@ -45,6 +45,7 @@ type GrpcClient struct { conn *grpc.ClientConn connStateCallback ConnStateNotifier connStateCallbackLock sync.RWMutex + serverURL string } // NewClient creates a new client to Management service @@ -74,9 +75,15 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE ctx: ctx, conn: conn, connStateCallbackLock: sync.RWMutex{}, + serverURL: addr, }, nil } +// GetServerURL returns the management server URL +func (c *GrpcClient) GetServerURL() string { + return c.serverURL +} + // Close closes connection to the Management Service func (c *GrpcClient) Close() error { return c.conn.Close()