Add client metrics system with OpenTelemetry and VictoriaMetrics support

Implements a comprehensive client metrics system to track peer connection
stages and performance. The system supports multiple backend implementations
(OpenTelemetry, VictoriaMetrics, and no-op) and tracks detailed connection
stage durations from creation through WireGuard handshake.

Key changes:
- Add metrics package with pluggable backend implementations
- Implement OpenTelemetry metrics backend
- Implement VictoriaMetrics metrics backend
- Add no-op metrics implementation for disabled state
- Track connection stages: creation, semaphore, signaling, connection ready, and WireGuard handshake
- Move WireGuard watcher functionality to conn.go
- Refactor engine to integrate metrics tracking
- Add metrics export endpoint in debug server
This commit is contained in:
Zoltán Papp
2026-01-15 22:16:38 +01:00
parent c5eb5ba1c6
commit e3a5c44d37
16 changed files with 694 additions and 80 deletions

View File

@@ -22,7 +22,6 @@ import (
"github.com/netbirdio/netbird/client/iface/device"
"github.com/netbirdio/netbird/client/internal/dns"
"github.com/netbirdio/netbird/client/internal/listener"
"github.com/netbirdio/netbird/client/internal/metrics"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/internal/profilemanager"
"github.com/netbirdio/netbird/client/internal/statemanager"
@@ -53,7 +52,6 @@ type ConnectClient struct {
engineMutex sync.Mutex
persistSyncResponse bool
clientMetrics *metrics.ClientMetrics
}
func NewConnectClient(
@@ -61,7 +59,6 @@ func NewConnectClient(
config *profilemanager.Config,
statusRecorder *peer.Status,
doInitalAutoUpdate bool,
clientMetrics *metrics.ClientMetrics,
) *ConnectClient {
return &ConnectClient{
ctx: ctx,
@@ -69,7 +66,6 @@ func NewConnectClient(
statusRecorder: statusRecorder,
doInitialAutoUpdate: doInitalAutoUpdate,
engineMutex: sync.Mutex{},
clientMetrics: clientMetrics,
}
}
@@ -311,7 +307,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
checks := loginResp.GetChecks()
c.engineMutex.Lock()
engine := NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks, stateManager, c.clientMetrics)
engine := NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks, stateManager)
engine.SetSyncResponsePersistence(c.persistSyncResponse)
c.engine = engine
c.engineMutex.Unlock()

View File

@@ -228,7 +228,13 @@ type localIpUpdater interface {
}
// NewEngine creates a new Connection Engine with probes attached
func NewEngine(clientCtx context.Context, clientCancel context.CancelFunc, signalClient signal.Client, mgmClient mgm.Client, relayManager *relayClient.Manager, config *EngineConfig, mobileDep MobileDependency, statusRecorder *peer.Status, checks []*mgmProto.Checks, stateManager *statemanager.Manager, clientMetrics *metrics.ClientMetrics) *Engine {
func NewEngine(clientCtx context.Context, clientCancel context.CancelFunc, signalClient signal.Client, mgmClient mgm.Client, relayManager *relayClient.Manager, config *EngineConfig, mobileDep MobileDependency, statusRecorder *peer.Status, checks []*mgmProto.Checks, stateManager *statemanager.Manager) *Engine {
// Initialize metrics based on deployment type
var deploymentType metrics.DeploymentType
if mgmClient != nil {
deploymentType = metrics.DetermineDeploymentType(mgmClient.GetServerURL())
}
engine := &Engine{
clientCtx: clientCtx,
clientCancel: clientCancel,
@@ -248,7 +254,7 @@ func NewEngine(clientCtx context.Context, clientCancel context.CancelFunc, signa
checks: checks,
connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit),
probeStunTurn: relay.NewStunTurnProbe(relay.DefaultCacheTTL),
clientMetrics: clientMetrics,
clientMetrics: metrics.NewClientMetrics(deploymentType, true),
}
log.Infof("I am: %s", config.WgPrivateKey.PublicKey().String())
@@ -294,11 +300,6 @@ func (e *Engine) Stop() error {
e.updateManager.Stop()
}
// Update metrics engine status
if e.clientMetrics != nil {
e.clientMetrics.SetEngineStatus(0) // 0=stopped
}
log.Info("cleaning up status recorder states")
e.statusRecorder.ReplaceOfflinePeers([]peer.State{})
e.statusRecorder.UpdateDNSStates([]peer.NSGroupState{})
@@ -529,11 +530,6 @@ func (e *Engine) Start(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL)
}
}()
// Update metrics engine status
if e.clientMetrics != nil {
e.clientMetrics.SetEngineStatus(1) // 1=running
}
return nil
}
@@ -1400,12 +1396,13 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix, agentV
}
serviceDependencies := peer.ServiceDependencies{
StatusRecorder: e.statusRecorder,
Signaler: e.signaler,
IFaceDiscover: e.mobileDep.IFaceDiscover,
RelayManager: e.relayManager,
SrWatcher: e.srWatcher,
Semaphore: e.connSemaphore,
StatusRecorder: e.statusRecorder,
Signaler: e.signaler,
IFaceDiscover: e.mobileDep.IFaceDiscover,
RelayManager: e.relayManager,
SrWatcher: e.srWatcher,
Semaphore: e.connSemaphore,
MetricsRecorder: e.clientMetrics,
}
peerConn, err := peer.NewConn(config, serviceDependencies)
if err != nil {
@@ -1689,6 +1686,11 @@ func (e *Engine) GetFirewallManager() firewallManager.Manager {
return e.firewall
}
// GetClientMetrics returns the client metrics
func (e *Engine) GetClientMetrics() *metrics.ClientMetrics {
return e.clientMetrics
}
func findIPFromInterfaceName(ifaceName string) (net.IP, error) {
iface, err := net.InterfaceByName(ifaceName)
if err != nil {

View File

@@ -0,0 +1,17 @@
package metrics
// ConnectionType represents the type of peer connection
type ConnectionType string
const (
// ConnectionTypeICE represents a direct peer-to-peer connection using ICE
ConnectionTypeICE ConnectionType = "ice"
// ConnectionTypeRelay represents a relayed connection
ConnectionTypeRelay ConnectionType = "relay"
)
// String returns the string representation of the connection type
func (c ConnectionType) String() string {
return string(c)
}

View File

@@ -0,0 +1,46 @@
package metrics
import (
"strings"
)
// DeploymentType represents the type of NetBird deployment
type DeploymentType int
const (
// DeploymentTypeUnknown represents an unknown or uninitialized deployment type
DeploymentTypeUnknown DeploymentType = iota
// DeploymentTypeCloud represents a cloud-hosted NetBird deployment
DeploymentTypeCloud
// DeploymentTypeSelfHosted represents a self-hosted NetBird deployment
DeploymentTypeSelfHosted
)
// String returns the string representation of the deployment type
func (d DeploymentType) String() string {
switch d {
case DeploymentTypeCloud:
return "cloud"
case DeploymentTypeSelfHosted:
return "selfhosted"
default:
return "selfhosted"
}
}
// DetermineDeploymentType determines if the deployment is cloud or self-hosted
// based on the management URL string
func DetermineDeploymentType(managementURL string) DeploymentType {
if managementURL == "" {
return DeploymentTypeUnknown
}
// Check for NetBird cloud API domain
if strings.Contains(strings.ToLower(managementURL), "api.netbird.io") {
return DeploymentTypeCloud
}
return DeploymentTypeSelfHosted
}

View File

@@ -1,32 +1,62 @@
package metrics
import (
"context"
"io"
"github.com/VictoriaMetrics/metrics"
"time"
)
// ClientMetrics holds all client-side metrics
// metricsImplementation defines the internal interface for metrics implementations
type metricsImplementation interface {
// RecordConnectionStages records connection stage metrics from timestamps
RecordConnectionStages(
ctx context.Context,
connectionType ConnectionType,
isReconnection bool,
timestamps ConnectionStageTimestamps,
)
// Export exports metrics in Prometheus format
Export(w io.Writer) error
}
type ClientMetrics struct {
// ICE negotiation metrics
iceNegotiationDuration *metrics.Histogram
impl metricsImplementation
}
// ConnectionStageTimestamps holds timestamps for each connection stage
type ConnectionStageTimestamps struct {
Created time.Time
SemaphoreAcquired time.Time
Signaling time.Time // First signal sent (initial) or signal received (reconnection)
ConnectionReady time.Time
WgHandshakeSuccess time.Time
}
// NewClientMetrics creates a new ClientMetrics instance
func NewClientMetrics() *ClientMetrics {
return &ClientMetrics{
// ICE negotiation metrics
iceNegotiationDuration: metrics.NewHistogram(`netbird_client_ice_negotiation_duration_seconds`),
// If enabled is true, uses an OpenTelemetry implementation
// If enabled is false, uses a no-op implementation
func NewClientMetrics(deploymentType DeploymentType, enabled bool) *ClientMetrics {
var impl metricsImplementation
if !enabled {
impl = &noopMetrics{}
} else {
impl = newOtelMetrics(deploymentType)
}
return &ClientMetrics{impl: impl}
}
// RecordICENegotiationDuration records the time taken for ICE negotiation
func (m *ClientMetrics) RecordICENegotiationDuration(seconds float64) {
m.iceNegotiationDuration.Update(seconds)
// RecordConnectionStages calculates stage durations from timestamps and records them
func (c *ClientMetrics) RecordConnectionStages(
ctx context.Context,
connectionType ConnectionType,
isReconnection bool,
timestamps ConnectionStageTimestamps,
) {
c.impl.RecordConnectionStages(ctx, connectionType, isReconnection, timestamps)
}
// Export writes all metrics in Prometheus format to the provided writer
func (m *ClientMetrics) Export(w io.Writer) error {
metrics.WritePrometheus(w, true)
return nil
// Export exports metrics to the writer
func (c *ClientMetrics) Export(w io.Writer) error {
return c.impl.Export(w)
}

View File

@@ -0,0 +1,22 @@
package metrics
import (
"context"
"io"
)
// noopMetrics is a no-op implementation of metricsImplementation
type noopMetrics struct{}
func (s *noopMetrics) RecordConnectionStages(
_ context.Context,
_ ConnectionType,
_ bool,
_ ConnectionStageTimestamps,
) {
// No-op
}
func (s *noopMetrics) Export(_ io.Writer) error {
return nil
}

View File

@@ -0,0 +1,250 @@
package metrics
import (
"context"
"fmt"
"io"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// otelMetrics is the OpenTelemetry implementation of ClientMetrics
type otelMetrics struct {
reader *sdkmetric.ManualReader
meterProvider *sdkmetric.MeterProvider
meter metric.Meter
// Static attributes applied to all metrics
deploymentType DeploymentType
// Connection stage duration histograms
stageCreationToSemaphore metric.Float64Histogram
stageSemaphoreToSignaling metric.Float64Histogram
stageSignalingToConnection metric.Float64Histogram
stageConnectionToHandshake metric.Float64Histogram
stageTotalCreationToHandshake metric.Float64Histogram
}
func newOtelMetrics(deploymentType DeploymentType) metricsImplementation {
reader := sdkmetric.NewManualReader()
meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader))
otel.SetMeterProvider(meterProvider)
meter := meterProvider.Meter("netbird.client")
stageCreationToSemaphore, err := meter.Float64Histogram(
"netbird.peer.connection.stage.creation_to_semaphore",
metric.WithDescription("Duration from connection creation to semaphore acquisition"),
metric.WithUnit("s"),
)
if err != nil {
return &noopMetrics{}
}
stageSemaphoreToSignaling, err := meter.Float64Histogram(
"netbird.peer.connection.stage.semaphore_to_signaling",
metric.WithDescription("Duration from semaphore acquisition to signaling start"),
metric.WithUnit("s"),
)
if err != nil {
return &noopMetrics{}
}
stageSignalingToConnection, err := meter.Float64Histogram(
"netbird.peer.connection.stage.signaling_to_connection",
metric.WithDescription("Duration from signaling start to connection ready"),
metric.WithUnit("s"),
)
if err != nil {
return &noopMetrics{}
}
stageConnectionToHandshake, err := meter.Float64Histogram(
"netbird.peer.connection.stage.connection_to_handshake",
metric.WithDescription("Duration from connection ready to WireGuard handshake success"),
metric.WithUnit("s"),
)
if err != nil {
return &noopMetrics{}
}
stageTotalCreationToHandshake, err := meter.Float64Histogram(
"netbird.peer.connection.total.creation_to_handshake",
metric.WithDescription("Total duration from connection creation to WireGuard handshake success"),
metric.WithUnit("s"),
)
if err != nil {
return &noopMetrics{}
}
return &otelMetrics{
reader: reader,
meterProvider: meterProvider,
meter: meter,
deploymentType: deploymentType,
stageCreationToSemaphore: stageCreationToSemaphore,
stageSemaphoreToSignaling: stageSemaphoreToSignaling,
stageSignalingToConnection: stageSignalingToConnection,
stageConnectionToHandshake: stageConnectionToHandshake,
stageTotalCreationToHandshake: stageTotalCreationToHandshake,
}
}
// RecordConnectionStages records the duration of each connection stage from timestamps
func (m *otelMetrics) RecordConnectionStages(
ctx context.Context,
connectionType ConnectionType,
isReconnection bool,
timestamps ConnectionStageTimestamps,
) {
// Calculate stage durations
var creationToSemaphore, semaphoreToSignaling, signalingToConnection, connectionToHandshake, totalDuration float64
if !timestamps.Created.IsZero() && !timestamps.SemaphoreAcquired.IsZero() {
creationToSemaphore = timestamps.SemaphoreAcquired.Sub(timestamps.Created).Seconds()
}
if !timestamps.SemaphoreAcquired.IsZero() && !timestamps.Signaling.IsZero() {
semaphoreToSignaling = timestamps.Signaling.Sub(timestamps.SemaphoreAcquired).Seconds()
}
if !timestamps.Signaling.IsZero() && !timestamps.ConnectionReady.IsZero() {
signalingToConnection = timestamps.ConnectionReady.Sub(timestamps.Signaling).Seconds()
}
if !timestamps.ConnectionReady.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
connectionToHandshake = timestamps.WgHandshakeSuccess.Sub(timestamps.ConnectionReady).Seconds()
}
if !timestamps.Created.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
totalDuration = timestamps.WgHandshakeSuccess.Sub(timestamps.Created).Seconds()
}
// Determine attempt type
attemptType := "initial"
if isReconnection {
attemptType = "reconnection"
}
// Combine deployment type, connection type, and attempt type attributes
attrs := metric.WithAttributes(
attribute.String("deployment_type", m.deploymentType.String()),
attribute.String("connection_type", connectionType.String()),
attribute.String("attempt_type", attemptType),
)
m.stageCreationToSemaphore.Record(ctx, creationToSemaphore, attrs)
m.stageSemaphoreToSignaling.Record(ctx, semaphoreToSignaling, attrs)
m.stageSignalingToConnection.Record(ctx, signalingToConnection, attrs)
m.stageConnectionToHandshake.Record(ctx, connectionToHandshake, attrs)
m.stageTotalCreationToHandshake.Record(ctx, totalDuration, attrs)
}
// Export writes metrics in Prometheus text format
func (m *otelMetrics) Export(w io.Writer) error {
if m.reader == nil {
return fmt.Errorf("metrics reader not initialized")
}
// Collect current metrics
var rm metricdata.ResourceMetrics
if err := m.reader.Collect(context.Background(), &rm); err != nil {
return fmt.Errorf("failed to collect metrics: %w", err)
}
// Iterate through scope metrics and write in Prometheus format
for _, sm := range rm.ScopeMetrics {
for _, m := range sm.Metrics {
// Write HELP line
if _, err := fmt.Fprintf(w, "# HELP %s %s\n", m.Name, m.Description); err != nil {
return err
}
// Write TYPE line
if _, err := fmt.Fprintf(w, "# TYPE %s histogram\n", m.Name); err != nil {
return err
}
// Handle histogram data
if hist, ok := m.Data.(metricdata.Histogram[float64]); ok {
for _, dp := range hist.DataPoints {
// Build label string from attributes
labelStr := ""
if len(dp.Attributes.ToSlice()) > 0 {
labels := ""
for _, attr := range dp.Attributes.ToSlice() {
if labels != "" {
labels += ","
}
labels += fmt.Sprintf("%s=\"%s\"", attr.Key, attr.Value.AsString())
}
labelStr = labels
}
// Write bucket counts
cumulativeCount := uint64(0)
for i, bound := range dp.Bounds {
cumulativeCount += dp.BucketCounts[i]
bucketLabel := labelStr
if bucketLabel != "" {
bucketLabel += ","
}
bucketLabel += fmt.Sprintf("le=\"%g\"", bound)
if _, err := fmt.Fprintf(w, "%s_bucket{%s} %d\n",
m.Name, bucketLabel, cumulativeCount); err != nil {
return err
}
}
// Write +Inf bucket (last bucket count)
if len(dp.BucketCounts) > len(dp.Bounds) {
cumulativeCount += dp.BucketCounts[len(dp.BucketCounts)-1]
}
bucketLabel := labelStr
if bucketLabel != "" {
bucketLabel += ","
}
bucketLabel += "le=\"+Inf\""
if _, err := fmt.Fprintf(w, "%s_bucket{%s} %d\n",
m.Name, bucketLabel, cumulativeCount); err != nil {
return err
}
// Write sum
if labelStr != "" {
if _, err := fmt.Fprintf(w, "%s_sum{%s} %g\n", m.Name, labelStr, dp.Sum); err != nil {
return err
}
} else {
if _, err := fmt.Fprintf(w, "%s_sum %g\n", m.Name, dp.Sum); err != nil {
return err
}
}
// Write count
if labelStr != "" {
if _, err := fmt.Fprintf(w, "%s_count{%s} %d\n", m.Name, labelStr, dp.Count); err != nil {
return err
}
} else {
if _, err := fmt.Fprintf(w, "%s_count %d\n", m.Name, dp.Count); err != nil {
return err
}
}
}
}
// Empty line between metrics
if _, err := fmt.Fprintf(w, "\n"); err != nil {
return err
}
}
}
return nil
}

View File

@@ -0,0 +1,106 @@
package metrics
import (
"context"
"fmt"
"io"
"github.com/VictoriaMetrics/metrics"
)
// victoriaMetrics is the VictoriaMetrics implementation of ClientMetrics
type victoriaMetrics struct {
// Static attributes applied to all metrics
deploymentType DeploymentType
// Metrics set for managing all metrics
set *metrics.Set
}
func newVictoriaMetrics(deploymentType DeploymentType) metricsImplementation {
return &victoriaMetrics{
deploymentType: deploymentType,
set: metrics.NewSet(),
}
}
// RecordConnectionStages records the duration of each connection stage from timestamps
func (m *victoriaMetrics) RecordConnectionStages(
ctx context.Context,
connectionType ConnectionType,
isReconnection bool,
timestamps ConnectionStageTimestamps,
) {
// Calculate stage durations
var creationToSemaphore, semaphoreToSignaling, signalingToConnection, connectionToHandshake, totalDuration float64
if !timestamps.Created.IsZero() && !timestamps.SemaphoreAcquired.IsZero() {
creationToSemaphore = timestamps.SemaphoreAcquired.Sub(timestamps.Created).Seconds()
}
if !timestamps.SemaphoreAcquired.IsZero() && !timestamps.Signaling.IsZero() {
semaphoreToSignaling = timestamps.Signaling.Sub(timestamps.SemaphoreAcquired).Seconds()
}
if !timestamps.Signaling.IsZero() && !timestamps.ConnectionReady.IsZero() {
signalingToConnection = timestamps.ConnectionReady.Sub(timestamps.Signaling).Seconds()
}
if !timestamps.ConnectionReady.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
connectionToHandshake = timestamps.WgHandshakeSuccess.Sub(timestamps.ConnectionReady).Seconds()
}
if !timestamps.Created.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
totalDuration = timestamps.WgHandshakeSuccess.Sub(timestamps.Created).Seconds()
}
// Determine attempt type
attemptType := "initial"
if isReconnection {
attemptType = "reconnection"
}
connTypeStr := connectionType.String()
// Record observations using histograms
m.set.GetOrCreateHistogram(
m.getMetricName("netbird_peer_connection_stage_creation_to_semaphore", connTypeStr, attemptType),
).Update(creationToSemaphore)
m.set.GetOrCreateHistogram(
m.getMetricName("netbird_peer_connection_stage_semaphore_to_signaling", connTypeStr, attemptType),
).Update(semaphoreToSignaling)
m.set.GetOrCreateHistogram(
m.getMetricName("netbird_peer_connection_stage_signaling_to_connection", connTypeStr, attemptType),
).Update(signalingToConnection)
m.set.GetOrCreateHistogram(
m.getMetricName("netbird_peer_connection_stage_connection_to_handshake", connTypeStr, attemptType),
).Update(connectionToHandshake)
m.set.GetOrCreateHistogram(
m.getMetricName("netbird_peer_connection_total_creation_to_handshake", connTypeStr, attemptType),
).Update(totalDuration)
}
// getMetricName constructs a metric name with labels
func (m *victoriaMetrics) getMetricName(baseName, connectionType, attemptType string) string {
return fmt.Sprintf(`%s{deployment_type=%q,connection_type=%q,attempt_type=%q}`,
baseName,
m.deploymentType.String(),
connectionType,
attemptType,
)
}
// Export writes metrics in Prometheus text format
func (m *victoriaMetrics) Export(w io.Writer) error {
if m.set == nil {
return fmt.Errorf("metrics set not initialized")
}
// Write metrics in Prometheus format
m.set.WritePrometheus(w)
return nil
}

View File

@@ -16,6 +16,7 @@ import (
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/wgproxy"
"github.com/netbirdio/netbird/client/internal/metrics"
"github.com/netbirdio/netbird/client/internal/peer/conntype"
"github.com/netbirdio/netbird/client/internal/peer/dispatcher"
"github.com/netbirdio/netbird/client/internal/peer/guard"
@@ -28,6 +29,16 @@ import (
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
)
// MetricsRecorder is an interface for recording peer connection metrics
type MetricsRecorder interface {
RecordConnectionStages(
ctx context.Context,
connectionType metrics.ConnectionType,
isReconnection bool,
timestamps metrics.ConnectionStageTimestamps,
)
}
type ServiceDependencies struct {
StatusRecorder *Status
Signaler *Signaler
@@ -36,6 +47,7 @@ type ServiceDependencies struct {
SrWatcher *guard.SRWatcher
Semaphore *semaphoregroup.SemaphoreGroup
PeerConnDispatcher *dispatcher.ConnectionDispatcher
MetricsRecorder MetricsRecorder
}
type WgConfig struct {
@@ -98,6 +110,7 @@ type Conn struct {
workerICE *WorkerICE
workerRelay *WorkerRelay
wgWatcher *WGWatcher
wgWatcherWg sync.WaitGroup
// used to store the remote Rosenpass key for Relayed connection in case of connection update from ice
@@ -115,6 +128,13 @@ type Conn struct {
dumpState *stateDump
endpointUpdater *EndpointUpdater
// Connection stage timestamps for metrics
metricsRecorder MetricsRecorder
hasBeenConnected bool // Track if we've ever established a successful connection
isReconnectionAttempt bool // Track if current attempt is a reconnection
stageTimestamps metrics.ConnectionStageTimestamps
stagesMutex sync.Mutex
}
// NewConn creates a new not opened Conn to the remote peer.
@@ -126,6 +146,7 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
connLog := log.WithField("peer", config.Key)
dumpState := newStateDump(config.Key, connLog, services.StatusRecorder)
var conn = &Conn{
Log: connLog,
config: config,
@@ -137,10 +158,13 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
semaphore: services.Semaphore,
statusRelay: worker.NewAtomicStatus(),
statusICE: worker.NewAtomicStatus(),
dumpState: newStateDump(config.Key, connLog, services.StatusRecorder),
dumpState: dumpState,
endpointUpdater: NewEndpointUpdater(connLog, config.WgConfig, isController(config)),
metricsRecorder: services.MetricsRecorder,
}
conn.wgWatcher = NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState, conn.onWGHandshakeSuccess)
return conn, nil
}
@@ -148,10 +172,21 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
// be used.
func (conn *Conn) Open(engineCtx context.Context) error {
// Record the start time - beginning of connection attempt
conn.stagesMutex.Lock()
conn.stageTimestamps.Created = time.Now()
conn.stagesMutex.Unlock()
// Semaphore.Add() blocks here until there's a free slot
if err := conn.semaphore.Add(engineCtx); err != nil {
return err
}
// Record when semaphore was acquired (after the wait)
conn.stagesMutex.Lock()
conn.stageTimestamps.SemaphoreAcquired = time.Now()
conn.stagesMutex.Unlock()
conn.mu.Lock()
defer conn.mu.Unlock()
@@ -231,7 +266,7 @@ func (conn *Conn) Close(signalToRemote bool) {
conn.Log.Infof("close peer connection")
conn.ctxCancel()
conn.workerRelay.DisableWgWatcher()
conn.wgWatcher.DisableWgWatcher()
conn.workerRelay.CloseConn()
conn.workerICE.Close()
@@ -260,6 +295,14 @@ func (conn *Conn) Close(signalToRemote bool) {
}
conn.setStatusToDisconnected()
// Reset connection metrics state
conn.stagesMutex.Lock()
conn.hasBeenConnected = false
conn.isReconnectionAttempt = false
conn.stageTimestamps = metrics.ConnectionStageTimestamps{}
conn.stagesMutex.Unlock()
conn.opened = false
conn.wg.Wait()
conn.Log.Infof("peer connection closed")
@@ -292,6 +335,22 @@ func (conn *Conn) SetOnDisconnected(handler func(remotePeer string)) {
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) {
conn.dumpState.RemoteOffer()
conn.Log.Infof("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay)
conn.stagesMutex.Lock()
// Detect reconnection: we had been connected before, but now both ICE and Relay are disconnected
if conn.hasBeenConnected && conn.evalStatus() != StatusConnected {
conn.isReconnectionAttempt = true
conn.stageTimestamps = metrics.ConnectionStageTimestamps{} // Reset for new reconnection attempt
now := time.Now()
conn.stageTimestamps.Created = now
conn.stageTimestamps.Signaling = now
conn.Log.Infof("Reconnection triggered by remote offer")
} else if conn.stageTimestamps.Signaling.IsZero() {
// First time receiving offer for this connection attempt (signaling start)
conn.stageTimestamps.Signaling = time.Now()
}
conn.stagesMutex.Unlock()
conn.handshaker.OnRemoteOffer(offer)
}
@@ -366,7 +425,7 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
ep = directEp
}
conn.workerRelay.DisableWgWatcher()
conn.wgWatcher.DisableWgWatcher()
// todo consider to run conn.wgWatcherWg.Wait() here
if conn.wgProxyRelay != nil {
@@ -390,6 +449,12 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
conn.wgProxyRelay.RedirectAs(ep)
}
conn.wgWatcherWg.Add(1)
go func() {
defer conn.wgWatcherWg.Done()
conn.wgWatcher.EnableWgWatcher(conn.ctx, nil)
}()
conn.currentConnPriority = priority
conn.statusICE.SetConnected()
conn.updateIceState(iceConnInfo)
@@ -423,15 +488,17 @@ func (conn *Conn) onICEStateDisconnected() {
conn.Log.Errorf("failed to switch to relay conn: %v", err)
}
conn.wgWatcher.DisableWgWatcher()
conn.wgWatcherWg.Add(1)
go func() {
defer conn.wgWatcherWg.Done()
conn.workerRelay.EnableWgWatcher(conn.ctx)
conn.wgWatcher.EnableWgWatcher(conn.ctx, conn.onWGDisconnected)
}()
conn.wgProxyRelay.Work()
conn.currentConnPriority = conntype.Relay
} else {
conn.Log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", conntype.None.String())
conn.wgWatcher.DisableWgWatcher()
conn.currentConnPriority = conntype.None
if err := conn.config.WgConfig.WgInterface.RemoveEndpointAddress(conn.config.WgConfig.RemoteKey); err != nil {
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
@@ -500,10 +567,11 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
return
}
conn.wgWatcher.DisableWgWatcher()
conn.wgWatcherWg.Add(1)
go func() {
defer conn.wgWatcherWg.Done()
conn.workerRelay.EnableWgWatcher(conn.ctx)
conn.wgWatcher.EnableWgWatcher(conn.ctx, conn.onWGDisconnected)
}()
wgConfigWorkaround()
@@ -560,7 +628,15 @@ func (conn *Conn) onGuardEvent() {
conn.dumpState.SendOffer()
if err := conn.handshaker.SendOffer(); err != nil {
conn.Log.Errorf("failed to send offer: %v", err)
return
}
// Record signaling start timestamp (first signal sent)
conn.stagesMutex.Lock()
if conn.stageTimestamps.Signaling.IsZero() {
conn.stageTimestamps.Signaling = time.Now()
}
conn.stagesMutex.Unlock()
}
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte) {
@@ -625,6 +701,20 @@ func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAdd
runtime.GC()
}
// Record connection ready timestamp and mark as connected
conn.stagesMutex.Lock()
if conn.stageTimestamps.ConnectionReady.IsZero() {
conn.stageTimestamps.ConnectionReady = time.Now()
}
// Mark that we've established a connection
conn.hasBeenConnected = true
// todo: remove this when fixed the wireguard watcher
conn.stageTimestamps.WgHandshakeSuccess = time.Now()
conn.recordConnectionMetrics()
conn.stagesMutex.Unlock()
if conn.onConnected != nil {
conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.config.WgConfig.AllowedIps[0].Addr().String(), remoteRosenpassAddr)
}
@@ -734,6 +824,50 @@ func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) {
conn.wgProxyRelay = proxy
}
// onWGDisconnected is called when the WireGuard handshake times out
func (conn *Conn) onWGDisconnected() {
conn.workerRelay.CloseConn()
conn.onRelayDisconnected()
}
// onWGHandshakeSuccess is called when the first WireGuard handshake is detected
func (conn *Conn) onWGHandshakeSuccess() {
conn.stagesMutex.Lock()
defer conn.stagesMutex.Unlock()
/*
if conn.stageTimestamps.WgHandshakeSuccess.IsZero() {
conn.stageTimestamps.WgHandshakeSuccess = time.Now()
conn.recordConnectionMetrics()
}
*/
}
// recordConnectionMetrics records connection stage timestamps as metrics
func (conn *Conn) recordConnectionMetrics() {
if conn.metricsRecorder == nil {
return
}
// Determine connection type based on current priority
var connType metrics.ConnectionType
switch conn.currentConnPriority {
case conntype.Relay:
connType = metrics.ConnectionTypeRelay
default:
connType = metrics.ConnectionTypeICE
}
// Record metrics with timestamps - duration calculation happens in metrics package
conn.metricsRecorder.RecordConnectionStages(
context.Background(),
connType,
conn.isReconnectionAttempt,
conn.stageTimestamps,
)
}
// AllowedIP returns the allowed IP of the remote peer
func (conn *Conn) AllowedIP() netip.Addr {
return conn.config.WgConfig.AllowedIps[0].Addr()

View File

@@ -34,14 +34,17 @@ type WGWatcher struct {
ctxCancel context.CancelFunc
ctxLock sync.Mutex
enabledTime time.Time
onFirstHandshakeFn func()
}
func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string, stateDump *stateDump) *WGWatcher {
func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string, stateDump *stateDump, onFirstHandshakeFn func()) *WGWatcher {
return &WGWatcher{
log: log,
wgIfaceStater: wgIfaceStater,
peerKey: peerKey,
stateDump: stateDump,
log: log,
wgIfaceStater: wgIfaceStater,
peerKey: peerKey,
stateDump: stateDump,
onFirstHandshakeFn: onFirstHandshakeFn,
}
}
@@ -100,12 +103,17 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel contex
case <-timer.C:
handshake, ok := w.handshakeCheck(lastHandshake)
if !ok {
onDisconnectedFn()
if onDisconnectedFn != nil {
onDisconnectedFn()
}
return
}
if lastHandshake.IsZero() {
elapsed := handshake.Sub(w.enabledTime).Seconds()
elapsed := w.calcElapsed(handshake)
w.log.Infof("first wg handshake detected within: %.2fsec, (%s)", elapsed, handshake)
if w.onFirstHandshakeFn != nil {
w.onFirstHandshakeFn()
}
}
lastHandshake = *handshake
@@ -134,19 +142,19 @@ func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) {
// the current know handshake did not change
if handshake.Equal(lastHandshake) {
w.log.Warnf("WireGuard handshake timed out, closing relay connection: %v", handshake)
w.log.Warnf("WireGuard handshake timed out: %v", handshake)
return nil, false
}
// in case if the machine is suspended, the handshake time will be in the past
if handshake.Add(checkPeriod).Before(time.Now()) {
w.log.Warnf("WireGuard handshake timed out, closing relay connection: %v", handshake)
w.log.Warnf("WireGuard handshake timed out: %v", handshake)
return nil, false
}
// error handling for handshake time in the future
if handshake.After(time.Now()) {
w.log.Warnf("WireGuard handshake is in the future, closing relay connection: %v", handshake)
w.log.Warnf("WireGuard handshake is in the future: %v", handshake)
return nil, false
}
@@ -164,3 +172,13 @@ func (w *WGWatcher) wgState() (time.Time, error) {
}
return wgState.LastHandshake, nil
}
// calcElapsed calculates elapsed time since watcher was enabled.
// The watcher started after the wg configuration happens, because of this need to normalise the negative value
func (w *WGWatcher) calcElapsed(handshake *time.Time) float64 {
elapsed := handshake.Sub(w.enabledTime).Seconds()
if elapsed < 0 {
elapsed = 0
}
return elapsed
}

View File

@@ -28,7 +28,7 @@ func TestWGWatcher_EnableWgWatcher(t *testing.T) {
mlog := log.WithField("peer", "tet")
mocWgIface := &MocWgIface{}
watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump("peer", mlog, &Status{}))
watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump("peer", mlog, &Status{}), nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -57,7 +57,7 @@ func TestWGWatcher_ReEnable(t *testing.T) {
mlog := log.WithField("peer", "tet")
mocWgIface := &MocWgIface{}
watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump("peer", mlog, &Status{}))
watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump("peer", mlog, &Status{}), nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@@ -30,8 +30,6 @@ type WorkerRelay struct {
relayLock sync.Mutex
relaySupportedOnRemotePeer atomic.Bool
wgWatcher *WGWatcher
}
func NewWorkerRelay(ctx context.Context, log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager *relayClient.Manager, stateDump *stateDump) *WorkerRelay {
@@ -42,7 +40,6 @@ func NewWorkerRelay(ctx context.Context, log *log.Entry, ctrl bool, config ConnC
config: config,
conn: conn,
relayManager: relayManager,
wgWatcher: NewWGWatcher(log, config.WgConfig.WgInterface, config.Key, stateDump),
}
return r
}
@@ -93,14 +90,6 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
})
}
func (w *WorkerRelay) EnableWgWatcher(ctx context.Context) {
w.wgWatcher.EnableWgWatcher(ctx, w.onWGDisconnected)
}
func (w *WorkerRelay) DisableWgWatcher() {
w.wgWatcher.DisableWgWatcher()
}
func (w *WorkerRelay) RelayInstanceAddress() (string, error) {
return w.relayManager.RelayInstanceAddress()
}
@@ -125,14 +114,6 @@ func (w *WorkerRelay) CloseConn() {
}
}
func (w *WorkerRelay) onWGDisconnected() {
w.relayLock.Lock()
_ = w.relayedConn.Close()
w.relayLock.Unlock()
w.conn.onRelayDisconnected()
}
func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool {
if !w.relayManager.HasRelayAddress() {
return false
@@ -148,6 +129,5 @@ func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress st
}
func (w *WorkerRelay) onRelayClientDisconnected() {
w.wgWatcher.DisableWgWatcher()
go w.conn.onRelayDisconnected()
}