mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-16 15:26:40 +00:00
* Add client metrics * Add client metrics system with OpenTelemetry and VictoriaMetrics support Implements a comprehensive client metrics system to track peer connection stages and performance. The system supports multiple backend implementations (OpenTelemetry, VictoriaMetrics, and no-op) and tracks detailed connection stage durations from creation through WireGuard handshake. Key changes: - Add metrics package with pluggable backend implementations - Implement OpenTelemetry metrics backend - Implement VictoriaMetrics metrics backend - Add no-op metrics implementation for disabled state - Track connection stages: creation, semaphore, signaling, connection ready, and WireGuard handshake - Move WireGuard watcher functionality to conn.go - Refactor engine to integrate metrics tracking - Add metrics export endpoint in debug server * Add signaling metrics tracking for initial and reconnection attempts * Reset connection stage timestamps during reconnections to exclude unnecessary metrics tracking * Delete otel lib from client * Update unit tests * Invoke callback on handshake success in WireGuard watcher * Add Netbird version tracking to client metrics Integrate Netbird version into VictoriaMetrics backend and metrics labels. Update `ClientMetrics` constructor and metric name formatting to include version information. * Add sync duration tracking to client metrics Introduce `RecordSyncDuration` for measuring sync message processing time. Update all metrics implementations (VictoriaMetrics, no-op) to support the new method. Refactor `ClientMetrics` to use `AgentInfo` for static agent data. * Remove no-op metrics implementation and simplify ClientMetrics constructor Eliminate unused `noopMetrics` and refactor `ClientMetrics` to always use the VictoriaMetrics implementation. Update associated logic to reflect these changes. * Add total duration tracking for connection attempts Calculate total duration for both initial connections and reconnections, accounting for different timestamp scenarios. Update `Export` method to include Prometheus HELP comments. * Add metrics push support to VictoriaMetrics integration * [client] anchor connection metrics to first signal received * Remove creation_to_semaphore connection stage metric The semaphore queuing stage (Created → SemaphoreAcquired) is no longer tracked. Connection metrics now start from SignalingReceived. Updated docs and Grafana dashboard accordingly. * [client] Add remote push config for metrics with version-based eligibility Introduce remoteconfig.Manager that fetches a remote JSON config to control metrics push interval and restrict pushing to a specific agent version range. When NB_METRICS_INTERVAL is set, remote config is bypassed entirely for local override. * [client] Add WASM-compatible NewClientMetrics implementation Replace NewClientMetrics in metrics.go with a WASM-specific stub in metrics_js.go, returning nil for compatibility with JS builds. Simplify method usage for WASM targets. * Add missing file * Update default case in DeploymentType.String to return "unknown" instead of "selfhosted" * [client] Rework metrics to use timestamped samples instead of histograms Replace cumulative Prometheus histograms with timestamped point-in-time samples that are pushed once and cleared. This fixes metrics for sparse events (connections/syncs that happen once at startup) where rate() and increase() produced incorrect or empty results. Changes: - Switch from VictoriaMetrics histogram library to raw Prometheus text format with explicit millisecond timestamps - Reset samples after successful push (no resending stale data) - Rename connection_to_handshake → connection_to_wg_handshake - Add netbird_peer_connection_count metric for ICE vs Relay tracking - Simplify dashboard: point-based scatter plots, donut pie chart - Add maxStalenessInterval=1m to VictoriaMetrics to prevent forward-fill - Fix deployment_type Unknown returning "selfhosted" instead of "unknown" - Fix inverted shouldPush condition in push.go * [client] Add InfluxDB metrics backend alongside VictoriaMetrics Add influxdb.go with timestamped line protocol export for sparse one-shot events. Restore victoria.go to use proper Prometheus histograms. Update Grafana dashboards, add InfluxDB datasource, and update docs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * [client] Fix metrics issues and update dev docker setup - Fix StopPush not clearing push state, preventing restart - Fix race condition reading currentConnPriority without lock in recordConnectionMetrics - Fix stale comment referencing old metrics server URL - Update docker-compose for InfluxDB: add scoped tokens, .env config, init scripts - Rename docker-compose.victoria.yml to docker-compose.yml * [client] Add anonymised peer tracking to pushed metrics Introduce peer_id and connection_pair_id tags to InfluxDB metrics. Public keys are hashed (truncated SHA-256) for anonymisation. The connection pair ID is deterministic regardless of which side computes it, enabling deduplication of reconnections in the ICE vs Relay dashboard. Also pin Grafana to v11.6.0 for file-based provisioning and fix datasource UID references. * Remove unused dependencies from go.mod and go.sum * Refactor InfluxDB ingest pipeline: extract validation logic - Move line validation logic to `validateLine` and `validateField` helper functions. - Improve error handling with structured validation and clearer separation of concerns. - Add stderr redirection for error messages in `create-tokens.sh`. * Set non-root user in Dockerfile for Ingest service * Fix Windows CI: command line too long * Remove Victoria metrics * Add hashed peer ID as Authorization header in metrics push * Revert influxdb in docker compose * Enable gzip compression and authorization validation for metrics push and ingest * Reducate code of complexity * Update debug documentation to include metrics.txt description * Increase `maxBodySize` limit to 50 MB and update gzip reader wrapping logic * Refactor deployment type detection to use URL parsing for improved accuracy * Update readme * Throttle remote config retries on fetch failure * Preserve first WG handshake timestamp, ignore rekeys * Skip adding empty metrics.txt to debug bundle in debug mode * Update default metrics server URL to https://ingest.netbird.io * Atomic metrics export-and-reset to prevent sample loss between Export and Reset calls * Fix doc * Refactor Push configuration to improve clarity and enforce minimum push interval * Remove `minPushInterval` and update push interval validation logic * Revert ExportAndReset, it is acceptable data loss * Fix metrics review issues: rename env var, remove stale infra, add tests - Rename NB_METRICS_ENABLED to NB_METRICS_PUSH_ENABLED to clarify that collection is always active (for debug bundles) and only push is opt-in - Change default config URL from staging to production (ingest.netbird.io) - Delete broken Prometheus dashboard (used non-existent metric names) - Delete unused VictoriaMetrics datasource config - Replace committed .env with .env.example containing placeholder values - Wire Grafana admin credentials through env vars in docker-compose - Make metricsStages a pointer to prevent reset-vs-write race on reconnect - Fix typed-nil interface in debug bundle path (GetClientMetrics) - Use deterministic field order in InfluxDB Export (sorted keys) - Replace Authorization header with X-Peer-ID for metrics push - Fix ingest server timeout to use time.Second instead of float - Fix gzip double-close, stale comments, trim log levels - Add tests for influxdb.go and MetricsStages * Add login duration metric, ingest tag validation, and duration bounds - Add netbird_login measurement recording login/auth duration to management server, with success/failure result tag - Validate InfluxDB tags against per-measurement allowlists in ingest server to prevent arbitrary tag injection - Cap all duration fields (*_seconds) at 300s instead of only total_seconds - Add ingest server tests for tag/field validation, bounds, and auth * Add arch tag to all metrics * Fix Grafana dashboard: add arch to drop columns, add login panels * Validate NB_METRICS_SERVER_URL is an absolute HTTP(S) URL * Address review comments: fix README wording, update stale comments * Clarify env var precedence does not bypass remote config eligibility * Remove accidentally committed pprof files --------- Co-authored-by: Viktor Liu <viktor@netbird.io>
925 lines
27 KiB
Go
925 lines
27 KiB
Go
package peer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"net/netip"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pion/ice/v4"
|
|
log "github.com/sirupsen/logrus"
|
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
|
|
|
"github.com/netbirdio/netbird/client/iface/configurer"
|
|
"github.com/netbirdio/netbird/client/iface/wgproxy"
|
|
"github.com/netbirdio/netbird/client/internal/metrics"
|
|
"github.com/netbirdio/netbird/client/internal/peer/conntype"
|
|
"github.com/netbirdio/netbird/client/internal/peer/dispatcher"
|
|
"github.com/netbirdio/netbird/client/internal/peer/guard"
|
|
icemaker "github.com/netbirdio/netbird/client/internal/peer/ice"
|
|
"github.com/netbirdio/netbird/client/internal/peer/id"
|
|
"github.com/netbirdio/netbird/client/internal/peer/worker"
|
|
"github.com/netbirdio/netbird/client/internal/stdnet"
|
|
"github.com/netbirdio/netbird/route"
|
|
relayClient "github.com/netbirdio/netbird/shared/relay/client"
|
|
)
|
|
|
|
// MetricsRecorder is an interface for recording peer connection metrics
|
|
type MetricsRecorder interface {
|
|
RecordConnectionStages(
|
|
ctx context.Context,
|
|
remotePubKey string,
|
|
connectionType metrics.ConnectionType,
|
|
isReconnection bool,
|
|
timestamps metrics.ConnectionStageTimestamps,
|
|
)
|
|
}
|
|
|
|
type ServiceDependencies struct {
|
|
StatusRecorder *Status
|
|
Signaler *Signaler
|
|
IFaceDiscover stdnet.ExternalIFaceDiscover
|
|
RelayManager *relayClient.Manager
|
|
SrWatcher *guard.SRWatcher
|
|
PeerConnDispatcher *dispatcher.ConnectionDispatcher
|
|
MetricsRecorder MetricsRecorder
|
|
}
|
|
|
|
type WgConfig struct {
|
|
WgListenPort int
|
|
RemoteKey string
|
|
WgInterface WGIface
|
|
AllowedIps []netip.Prefix
|
|
PreSharedKey *wgtypes.Key
|
|
}
|
|
|
|
type RosenpassConfig struct {
|
|
// RosenpassPubKey is this peer's Rosenpass public key
|
|
PubKey []byte
|
|
// RosenpassPubKey is this peer's RosenpassAddr server address (IP:port)
|
|
Addr string
|
|
|
|
PermissiveMode bool
|
|
}
|
|
|
|
// ConnConfig is a peer Connection configuration
|
|
type ConnConfig struct {
|
|
// Key is a public key of a remote peer
|
|
Key string
|
|
// LocalKey is a public key of a local peer
|
|
LocalKey string
|
|
|
|
AgentVersion string
|
|
|
|
Timeout time.Duration
|
|
|
|
WgConfig WgConfig
|
|
|
|
LocalWgPort int
|
|
|
|
RosenpassConfig RosenpassConfig
|
|
|
|
// ICEConfig ICE protocol configuration
|
|
ICEConfig icemaker.Config
|
|
}
|
|
|
|
type Conn struct {
|
|
Log *log.Entry
|
|
mu sync.Mutex
|
|
ctx context.Context
|
|
ctxCancel context.CancelFunc
|
|
config ConnConfig
|
|
statusRecorder *Status
|
|
signaler *Signaler
|
|
iFaceDiscover stdnet.ExternalIFaceDiscover
|
|
relayManager *relayClient.Manager
|
|
srWatcher *guard.SRWatcher
|
|
|
|
onConnected func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)
|
|
onDisconnected func(remotePeer string)
|
|
rosenpassInitializedPresharedKeyValidator func(peerKey string) bool
|
|
|
|
statusRelay *worker.AtomicWorkerStatus
|
|
statusICE *worker.AtomicWorkerStatus
|
|
currentConnPriority conntype.ConnPriority
|
|
opened bool // this flag is used to prevent close in case of not opened connection
|
|
|
|
workerICE *WorkerICE
|
|
workerRelay *WorkerRelay
|
|
|
|
wgWatcher *WGWatcher
|
|
wgWatcherWg sync.WaitGroup
|
|
wgWatcherCancel context.CancelFunc
|
|
|
|
// used to store the remote Rosenpass key for Relayed connection in case of connection update from ice
|
|
rosenpassRemoteKey []byte
|
|
|
|
wgProxyICE wgproxy.Proxy
|
|
wgProxyRelay wgproxy.Proxy
|
|
handshaker *Handshaker
|
|
|
|
guard *guard.Guard
|
|
wg sync.WaitGroup
|
|
|
|
// debug purpose
|
|
dumpState *stateDump
|
|
|
|
endpointUpdater *EndpointUpdater
|
|
|
|
// Connection stage timestamps for metrics
|
|
metricsRecorder MetricsRecorder
|
|
metricsStages *MetricsStages
|
|
}
|
|
|
|
// NewConn creates a new not opened Conn to the remote peer.
|
|
// To establish a connection run Conn.Open
|
|
func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
|
|
if len(config.WgConfig.AllowedIps) == 0 {
|
|
return nil, fmt.Errorf("allowed IPs is empty")
|
|
}
|
|
|
|
connLog := log.WithField("peer", config.Key)
|
|
|
|
dumpState := newStateDump(config.Key, connLog, services.StatusRecorder)
|
|
var conn = &Conn{
|
|
Log: connLog,
|
|
config: config,
|
|
statusRecorder: services.StatusRecorder,
|
|
signaler: services.Signaler,
|
|
iFaceDiscover: services.IFaceDiscover,
|
|
relayManager: services.RelayManager,
|
|
srWatcher: services.SrWatcher,
|
|
statusRelay: worker.NewAtomicStatus(),
|
|
statusICE: worker.NewAtomicStatus(),
|
|
dumpState: dumpState,
|
|
endpointUpdater: NewEndpointUpdater(connLog, config.WgConfig, isController(config)),
|
|
wgWatcher: NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState),
|
|
metricsRecorder: services.MetricsRecorder,
|
|
}
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
// Open opens connection to the remote peer
|
|
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
|
|
// be used.
|
|
func (conn *Conn) Open(engineCtx context.Context) error {
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
|
|
if conn.opened {
|
|
return nil
|
|
}
|
|
|
|
// Allocate new metrics stages so old goroutines don't corrupt new state
|
|
conn.metricsStages = &MetricsStages{}
|
|
|
|
conn.ctx, conn.ctxCancel = context.WithCancel(engineCtx)
|
|
|
|
conn.workerRelay = NewWorkerRelay(conn.ctx, conn.Log, isController(conn.config), conn.config, conn, conn.relayManager)
|
|
|
|
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
|
|
workerICE, err := NewWorkerICE(conn.ctx, conn.Log, conn.config, conn, conn.signaler, conn.iFaceDiscover, conn.statusRecorder, relayIsSupportedLocally)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
conn.workerICE = workerICE
|
|
|
|
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay, conn.metricsStages)
|
|
|
|
conn.handshaker.AddRelayListener(conn.workerRelay.OnNewOffer)
|
|
if !isForceRelayed() {
|
|
conn.handshaker.AddICEListener(conn.workerICE.OnNewOffer)
|
|
}
|
|
|
|
conn.guard = guard.NewGuard(conn.Log, conn.isConnectedOnAllWay, conn.config.Timeout, conn.srWatcher)
|
|
|
|
conn.wg.Add(1)
|
|
go func() {
|
|
defer conn.wg.Done()
|
|
conn.handshaker.Listen(conn.ctx)
|
|
}()
|
|
go conn.dumpState.Start(conn.ctx)
|
|
|
|
peerState := State{
|
|
PubKey: conn.config.Key,
|
|
ConnStatusUpdate: time.Now(),
|
|
ConnStatus: StatusConnecting,
|
|
Mux: new(sync.RWMutex),
|
|
}
|
|
if err := conn.statusRecorder.UpdatePeerState(peerState); err != nil {
|
|
conn.Log.Warnf("error while updating the state err: %v", err)
|
|
}
|
|
|
|
conn.wg.Add(1)
|
|
go func() {
|
|
defer conn.wg.Done()
|
|
conn.guard.Start(conn.ctx, conn.onGuardEvent)
|
|
}()
|
|
conn.opened = true
|
|
return nil
|
|
}
|
|
|
|
// Close closes this peer Conn issuing a close event to the Conn closeCh
|
|
func (conn *Conn) Close(signalToRemote bool) {
|
|
conn.mu.Lock()
|
|
defer conn.wgWatcherWg.Wait()
|
|
defer conn.mu.Unlock()
|
|
|
|
if !conn.opened {
|
|
conn.Log.Debugf("ignore close connection to peer")
|
|
return
|
|
}
|
|
|
|
if signalToRemote {
|
|
if err := conn.signaler.SignalIdle(conn.config.Key); err != nil {
|
|
conn.Log.Errorf("failed to signal idle state to peer: %v", err)
|
|
}
|
|
}
|
|
|
|
conn.Log.Infof("close peer connection")
|
|
conn.ctxCancel()
|
|
|
|
if conn.wgWatcherCancel != nil {
|
|
conn.wgWatcherCancel()
|
|
}
|
|
conn.workerRelay.CloseConn()
|
|
conn.workerICE.Close()
|
|
|
|
if conn.wgProxyRelay != nil {
|
|
err := conn.wgProxyRelay.CloseConn()
|
|
if err != nil {
|
|
conn.Log.Errorf("failed to close wg proxy for relay: %v", err)
|
|
}
|
|
conn.wgProxyRelay = nil
|
|
}
|
|
|
|
if conn.wgProxyICE != nil {
|
|
err := conn.wgProxyICE.CloseConn()
|
|
if err != nil {
|
|
conn.Log.Errorf("failed to close wg proxy for ice: %v", err)
|
|
}
|
|
conn.wgProxyICE = nil
|
|
}
|
|
|
|
if err := conn.endpointUpdater.RemoveWgPeer(); err != nil {
|
|
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
|
|
}
|
|
|
|
if conn.evalStatus() == StatusConnected && conn.onDisconnected != nil {
|
|
conn.onDisconnected(conn.config.WgConfig.RemoteKey)
|
|
}
|
|
|
|
conn.setStatusToDisconnected()
|
|
conn.opened = false
|
|
conn.wg.Wait()
|
|
conn.Log.Infof("peer connection closed")
|
|
}
|
|
|
|
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
|
|
// doesn't block, discards the message if connection wasn't ready
|
|
func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) {
|
|
conn.dumpState.RemoteAnswer()
|
|
conn.Log.Infof("OnRemoteAnswer, priority: %s, status ICE: %s, status relay: %s", conn.currentConnPriority, conn.statusICE, conn.statusRelay)
|
|
conn.handshaker.OnRemoteAnswer(answer)
|
|
}
|
|
|
|
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
|
|
func (conn *Conn) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) {
|
|
conn.dumpState.RemoteCandidate()
|
|
conn.workerICE.OnRemoteCandidate(candidate, haRoutes)
|
|
}
|
|
|
|
// SetOnConnected sets a handler function to be triggered by Conn when a new connection to a remote peer established
|
|
func (conn *Conn) SetOnConnected(handler func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)) {
|
|
conn.onConnected = handler
|
|
}
|
|
|
|
// SetOnDisconnected sets a handler function to be triggered by Conn when a connection to a remote disconnected
|
|
func (conn *Conn) SetOnDisconnected(handler func(remotePeer string)) {
|
|
conn.onDisconnected = handler
|
|
}
|
|
|
|
// SetRosenpassInitializedPresharedKeyValidator sets a function to check if Rosenpass has taken over
|
|
// PSK management for a peer. When this returns true, presharedKey() returns nil
|
|
// to prevent UpdatePeer from overwriting the Rosenpass-managed PSK.
|
|
func (conn *Conn) SetRosenpassInitializedPresharedKeyValidator(handler func(peerKey string) bool) {
|
|
conn.rosenpassInitializedPresharedKeyValidator = handler
|
|
}
|
|
|
|
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) {
|
|
conn.dumpState.RemoteOffer()
|
|
conn.Log.Infof("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay)
|
|
conn.handshaker.OnRemoteOffer(offer)
|
|
}
|
|
|
|
// WgConfig returns the WireGuard config
|
|
func (conn *Conn) WgConfig() WgConfig {
|
|
return conn.config.WgConfig
|
|
}
|
|
|
|
// IsConnected returns true if the peer is connected
|
|
func (conn *Conn) IsConnected() bool {
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
|
|
return conn.evalStatus() == StatusConnected
|
|
}
|
|
|
|
func (conn *Conn) GetKey() string {
|
|
return conn.config.Key
|
|
}
|
|
|
|
func (conn *Conn) ConnID() id.ConnID {
|
|
return id.ConnID(conn)
|
|
}
|
|
|
|
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
|
|
func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConnInfo ICEConnInfo) {
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
|
|
if conn.ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
if remoteConnNil(conn.Log, iceConnInfo.RemoteConn) {
|
|
conn.Log.Errorf("remote ICE connection is nil")
|
|
return
|
|
}
|
|
|
|
// this never should happen, because Relay is the lower priority and ICE always close the deprecated connection before upgrade
|
|
// todo consider to remove this check
|
|
if conn.currentConnPriority > priority {
|
|
conn.Log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
|
|
conn.statusICE.SetConnected()
|
|
conn.updateIceState(iceConnInfo, time.Now())
|
|
return
|
|
}
|
|
|
|
conn.Log.Infof("set ICE to active connection")
|
|
conn.dumpState.P2PConnected()
|
|
|
|
var (
|
|
ep *net.UDPAddr
|
|
wgProxy wgproxy.Proxy
|
|
err error
|
|
)
|
|
if iceConnInfo.RelayedOnLocal {
|
|
conn.dumpState.NewLocalProxy()
|
|
wgProxy, err = conn.newProxy(iceConnInfo.RemoteConn)
|
|
if err != nil {
|
|
conn.Log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
|
|
return
|
|
}
|
|
ep = wgProxy.EndpointAddr()
|
|
conn.wgProxyICE = wgProxy
|
|
} else {
|
|
directEp, err := net.ResolveUDPAddr("udp", iceConnInfo.RemoteConn.RemoteAddr().String())
|
|
if err != nil {
|
|
log.Errorf("failed to resolveUDPaddr")
|
|
conn.handleConfigurationFailure(err, nil)
|
|
return
|
|
}
|
|
ep = directEp
|
|
}
|
|
|
|
if conn.wgProxyRelay != nil {
|
|
conn.wgProxyRelay.Pause()
|
|
}
|
|
|
|
if wgProxy != nil {
|
|
wgProxy.Work()
|
|
}
|
|
|
|
conn.Log.Infof("configure WireGuard endpoint to: %s", ep.String())
|
|
updateTime := time.Now()
|
|
conn.enableWgWatcherIfNeeded(updateTime)
|
|
|
|
presharedKey := conn.presharedKey(iceConnInfo.RosenpassPubKey)
|
|
if err = conn.endpointUpdater.ConfigureWGEndpoint(ep, presharedKey); err != nil {
|
|
conn.handleConfigurationFailure(err, wgProxy)
|
|
return
|
|
}
|
|
wgConfigWorkaround()
|
|
|
|
if conn.wgProxyRelay != nil {
|
|
conn.Log.Debugf("redirect packets from relayed conn to WireGuard")
|
|
conn.wgProxyRelay.RedirectAs(ep)
|
|
}
|
|
|
|
conn.currentConnPriority = priority
|
|
conn.statusICE.SetConnected()
|
|
conn.updateIceState(iceConnInfo, updateTime)
|
|
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr, updateTime)
|
|
}
|
|
|
|
func (conn *Conn) onICEStateDisconnected(sessionChanged bool) {
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
|
|
if conn.ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
conn.Log.Tracef("ICE connection state changed to disconnected")
|
|
|
|
if conn.wgProxyICE != nil {
|
|
if err := conn.wgProxyICE.CloseConn(); err != nil {
|
|
conn.Log.Warnf("failed to close deprecated wg proxy conn: %v", err)
|
|
}
|
|
}
|
|
|
|
// switch back to relay connection
|
|
if conn.isReadyToUpgrade() {
|
|
conn.Log.Infof("ICE disconnected, set Relay to active connection")
|
|
conn.dumpState.SwitchToRelay()
|
|
if sessionChanged {
|
|
conn.resetEndpoint()
|
|
}
|
|
|
|
// todo consider to move after the ConfigureWGEndpoint
|
|
conn.wgProxyRelay.Work()
|
|
|
|
presharedKey := conn.presharedKey(conn.rosenpassRemoteKey)
|
|
if err := conn.endpointUpdater.SwitchWGEndpoint(conn.wgProxyRelay.EndpointAddr(), presharedKey); err != nil {
|
|
conn.Log.Errorf("failed to switch to relay conn: %v", err)
|
|
}
|
|
|
|
conn.currentConnPriority = conntype.Relay
|
|
} else {
|
|
conn.Log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", conntype.None.String())
|
|
conn.currentConnPriority = conntype.None
|
|
if err := conn.config.WgConfig.WgInterface.RemoveEndpointAddress(conn.config.WgConfig.RemoteKey); err != nil {
|
|
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
|
|
}
|
|
}
|
|
|
|
changed := conn.statusICE.Get() != worker.StatusDisconnected
|
|
if changed {
|
|
conn.guard.SetICEConnDisconnected()
|
|
}
|
|
conn.statusICE.SetDisconnected()
|
|
|
|
conn.disableWgWatcherIfNeeded()
|
|
|
|
if conn.currentConnPriority == conntype.None {
|
|
conn.metricsStages.Disconnected()
|
|
}
|
|
|
|
peerState := State{
|
|
PubKey: conn.config.Key,
|
|
ConnStatus: conn.evalStatus(),
|
|
Relayed: conn.isRelayed(),
|
|
ConnStatusUpdate: time.Now(),
|
|
}
|
|
if err := conn.statusRecorder.UpdatePeerICEStateToDisconnected(peerState); err != nil {
|
|
conn.Log.Warnf("unable to set peer's state to disconnected ice, got error: %v", err)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
|
|
if conn.ctx.Err() != nil {
|
|
if err := rci.relayedConn.Close(); err != nil {
|
|
conn.Log.Warnf("failed to close unnecessary relayed connection: %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
conn.dumpState.RelayConnected()
|
|
conn.Log.Debugf("Relay connection has been established, setup the WireGuard")
|
|
|
|
wgProxy, err := conn.newProxy(rci.relayedConn)
|
|
if err != nil {
|
|
conn.Log.Errorf("failed to add relayed net.Conn to local proxy: %v", err)
|
|
return
|
|
}
|
|
wgProxy.SetDisconnectListener(conn.onRelayDisconnected)
|
|
|
|
conn.dumpState.NewLocalProxy()
|
|
|
|
conn.Log.Infof("created new wgProxy for relay connection: %s", wgProxy.EndpointAddr().String())
|
|
|
|
if conn.isICEActive() {
|
|
conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
|
|
conn.setRelayedProxy(wgProxy)
|
|
conn.statusRelay.SetConnected()
|
|
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey, time.Now())
|
|
return
|
|
}
|
|
|
|
controller := isController(conn.config)
|
|
|
|
if controller {
|
|
wgProxy.Work()
|
|
}
|
|
updateTime := time.Now()
|
|
conn.enableWgWatcherIfNeeded(updateTime)
|
|
if err := conn.endpointUpdater.ConfigureWGEndpoint(wgProxy.EndpointAddr(), conn.presharedKey(rci.rosenpassPubKey)); err != nil {
|
|
if err := wgProxy.CloseConn(); err != nil {
|
|
conn.Log.Warnf("Failed to close relay connection: %v", err)
|
|
}
|
|
conn.Log.Errorf("Failed to update WireGuard peer configuration: %v", err)
|
|
return
|
|
}
|
|
if !controller {
|
|
wgProxy.Work()
|
|
}
|
|
|
|
wgConfigWorkaround()
|
|
|
|
conn.rosenpassRemoteKey = rci.rosenpassPubKey
|
|
conn.currentConnPriority = conntype.Relay
|
|
conn.statusRelay.SetConnected()
|
|
conn.setRelayedProxy(wgProxy)
|
|
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey, updateTime)
|
|
conn.Log.Infof("start to communicate with peer via relay")
|
|
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr, updateTime)
|
|
}
|
|
|
|
func (conn *Conn) onRelayDisconnected() {
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
conn.handleRelayDisconnectedLocked()
|
|
}
|
|
|
|
// handleRelayDisconnectedLocked handles relay disconnection. Caller must hold conn.mu.
|
|
func (conn *Conn) handleRelayDisconnectedLocked() {
|
|
if conn.ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
conn.Log.Debugf("relay connection is disconnected")
|
|
|
|
if conn.currentConnPriority == conntype.Relay {
|
|
conn.Log.Debugf("clean up WireGuard config")
|
|
conn.currentConnPriority = conntype.None
|
|
if err := conn.config.WgConfig.WgInterface.RemoveEndpointAddress(conn.config.WgConfig.RemoteKey); err != nil {
|
|
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
|
|
}
|
|
}
|
|
|
|
if conn.wgProxyRelay != nil {
|
|
_ = conn.wgProxyRelay.CloseConn()
|
|
conn.wgProxyRelay = nil
|
|
}
|
|
|
|
changed := conn.statusRelay.Get() != worker.StatusDisconnected
|
|
if changed {
|
|
conn.guard.SetRelayedConnDisconnected()
|
|
}
|
|
conn.statusRelay.SetDisconnected()
|
|
|
|
conn.disableWgWatcherIfNeeded()
|
|
|
|
if conn.currentConnPriority == conntype.None {
|
|
conn.metricsStages.Disconnected()
|
|
}
|
|
|
|
peerState := State{
|
|
PubKey: conn.config.Key,
|
|
ConnStatus: conn.evalStatus(),
|
|
Relayed: conn.isRelayed(),
|
|
ConnStatusUpdate: time.Now(),
|
|
}
|
|
if err := conn.statusRecorder.UpdatePeerRelayedStateToDisconnected(peerState); err != nil {
|
|
conn.Log.Warnf("unable to save peer's state to Relay disconnected, got error: %v", err)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) onGuardEvent() {
|
|
conn.dumpState.SendOffer()
|
|
if err := conn.handshaker.SendOffer(); err != nil {
|
|
conn.Log.Errorf("failed to send offer: %v", err)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) onWGDisconnected() {
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
|
|
if conn.ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
conn.Log.Warnf("WireGuard handshake timeout detected, closing current connection")
|
|
|
|
// Close the active connection based on current priority
|
|
switch conn.currentConnPriority {
|
|
case conntype.Relay:
|
|
conn.workerRelay.CloseConn()
|
|
conn.handleRelayDisconnectedLocked()
|
|
case conntype.ICEP2P, conntype.ICETurn:
|
|
conn.workerICE.Close()
|
|
default:
|
|
conn.Log.Debugf("No active connection to close on WG timeout")
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte, updateTime time.Time) {
|
|
peerState := State{
|
|
PubKey: conn.config.Key,
|
|
ConnStatusUpdate: updateTime,
|
|
ConnStatus: conn.evalStatus(),
|
|
Relayed: conn.isRelayed(),
|
|
RelayServerAddress: relayServerAddr,
|
|
RosenpassEnabled: isRosenpassEnabled(rosenpassPubKey),
|
|
}
|
|
|
|
err := conn.statusRecorder.UpdatePeerRelayedState(peerState)
|
|
if err != nil {
|
|
conn.Log.Warnf("unable to save peer's Relay state, got error: %v", err)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo, updateTime time.Time) {
|
|
peerState := State{
|
|
PubKey: conn.config.Key,
|
|
ConnStatusUpdate: updateTime,
|
|
ConnStatus: conn.evalStatus(),
|
|
Relayed: iceConnInfo.Relayed,
|
|
LocalIceCandidateType: iceConnInfo.LocalIceCandidateType,
|
|
RemoteIceCandidateType: iceConnInfo.RemoteIceCandidateType,
|
|
LocalIceCandidateEndpoint: iceConnInfo.LocalIceCandidateEndpoint,
|
|
RemoteIceCandidateEndpoint: iceConnInfo.RemoteIceCandidateEndpoint,
|
|
RosenpassEnabled: isRosenpassEnabled(iceConnInfo.RosenpassPubKey),
|
|
}
|
|
|
|
err := conn.statusRecorder.UpdatePeerICEState(peerState)
|
|
if err != nil {
|
|
conn.Log.Warnf("unable to save peer's ICE state, got error: %v", err)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) setStatusToDisconnected() {
|
|
conn.statusRelay.SetDisconnected()
|
|
conn.statusICE.SetDisconnected()
|
|
conn.currentConnPriority = conntype.None
|
|
|
|
peerState := State{
|
|
PubKey: conn.config.Key,
|
|
ConnStatus: StatusIdle,
|
|
ConnStatusUpdate: time.Now(),
|
|
Mux: new(sync.RWMutex),
|
|
}
|
|
err := conn.statusRecorder.UpdatePeerState(peerState)
|
|
if err != nil {
|
|
// pretty common error because by that time Engine can already remove the peer and status won't be available.
|
|
// todo rethink status updates
|
|
conn.Log.Debugf("error while updating peer's state, err: %v", err)
|
|
}
|
|
if err := conn.statusRecorder.UpdateWireGuardPeerState(conn.config.Key, configurer.WGStats{}); err != nil {
|
|
conn.Log.Debugf("failed to reset wireguard stats for peer: %s", err)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string, updateTime time.Time) {
|
|
if runtime.GOOS == "ios" {
|
|
runtime.GC()
|
|
}
|
|
|
|
conn.metricsStages.RecordConnectionReady(updateTime)
|
|
|
|
if conn.onConnected != nil {
|
|
conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.config.WgConfig.AllowedIps[0].Addr().String(), remoteRosenpassAddr)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) isRelayed() bool {
|
|
switch conn.currentConnPriority {
|
|
case conntype.Relay, conntype.ICETurn:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) evalStatus() ConnStatus {
|
|
if conn.statusRelay.Get() == worker.StatusConnected || conn.statusICE.Get() == worker.StatusConnected {
|
|
return StatusConnected
|
|
}
|
|
|
|
return StatusConnecting
|
|
}
|
|
|
|
func (conn *Conn) isConnectedOnAllWay() (connected bool) {
|
|
// would be better to protect this with a mutex, but it could cause deadlock with Close function
|
|
|
|
defer func() {
|
|
if !connected {
|
|
conn.logTraceConnState()
|
|
}
|
|
}()
|
|
|
|
// For JS platform: only relay connection is supported
|
|
if runtime.GOOS == "js" {
|
|
return conn.statusRelay.Get() == worker.StatusConnected
|
|
}
|
|
|
|
// For non-JS platforms: check ICE connection status
|
|
if conn.statusICE.Get() == worker.StatusDisconnected && !conn.workerICE.InProgress() {
|
|
return false
|
|
}
|
|
|
|
// If relay is supported with peer, it must also be connected
|
|
if conn.workerRelay.IsRelayConnectionSupportedWithPeer() {
|
|
if conn.statusRelay.Get() == worker.StatusDisconnected {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (conn *Conn) enableWgWatcherIfNeeded(enabledTime time.Time) {
|
|
if !conn.wgWatcher.IsEnabled() {
|
|
wgWatcherCtx, wgWatcherCancel := context.WithCancel(conn.ctx)
|
|
conn.wgWatcherCancel = wgWatcherCancel
|
|
conn.wgWatcherWg.Add(1)
|
|
go func() {
|
|
defer conn.wgWatcherWg.Done()
|
|
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, enabledTime, conn.onWGDisconnected, conn.onWGHandshakeSuccess)
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) disableWgWatcherIfNeeded() {
|
|
if conn.currentConnPriority == conntype.None && conn.wgWatcherCancel != nil {
|
|
conn.wgWatcherCancel()
|
|
conn.wgWatcherCancel = nil
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) {
|
|
conn.Log.Debugf("setup proxied WireGuard connection")
|
|
udpAddr := &net.UDPAddr{
|
|
IP: conn.config.WgConfig.AllowedIps[0].Addr().AsSlice(),
|
|
Port: conn.config.WgConfig.WgListenPort,
|
|
}
|
|
|
|
wgProxy := conn.config.WgConfig.WgInterface.GetProxy()
|
|
if err := wgProxy.AddTurnConn(conn.ctx, udpAddr, remoteConn); err != nil {
|
|
conn.Log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
|
|
return nil, err
|
|
}
|
|
return wgProxy, nil
|
|
}
|
|
|
|
func (conn *Conn) resetEndpoint() {
|
|
if !isController(conn.config) {
|
|
return
|
|
}
|
|
conn.Log.Infof("reset wg endpoint")
|
|
conn.wgWatcher.Reset()
|
|
if err := conn.endpointUpdater.RemoveEndpointAddress(); err != nil {
|
|
conn.Log.Warnf("failed to remove endpoint address before update: %v", err)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) isReadyToUpgrade() bool {
|
|
return conn.wgProxyRelay != nil && conn.currentConnPriority != conntype.Relay
|
|
}
|
|
|
|
func (conn *Conn) isICEActive() bool {
|
|
return (conn.currentConnPriority == conntype.ICEP2P || conn.currentConnPriority == conntype.ICETurn) && conn.statusICE.Get() == worker.StatusConnected
|
|
}
|
|
|
|
func (conn *Conn) handleConfigurationFailure(err error, wgProxy wgproxy.Proxy) {
|
|
conn.Log.Warnf("Failed to update wg peer configuration: %v", err)
|
|
if wgProxy != nil {
|
|
if ierr := wgProxy.CloseConn(); ierr != nil {
|
|
conn.Log.Warnf("Failed to close wg proxy: %v", ierr)
|
|
}
|
|
}
|
|
if conn.wgProxyRelay != nil {
|
|
conn.wgProxyRelay.Work()
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) logTraceConnState() {
|
|
if conn.workerRelay.IsRelayConnectionSupportedWithPeer() {
|
|
conn.Log.Tracef("connectivity guard check, relay state: %s, ice state: %s", conn.statusRelay, conn.statusICE)
|
|
} else {
|
|
conn.Log.Tracef("connectivity guard check, ice state: %s", conn.statusICE)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) {
|
|
if conn.wgProxyRelay != nil {
|
|
if err := conn.wgProxyRelay.CloseConn(); err != nil {
|
|
conn.Log.Warnf("failed to close deprecated wg proxy conn: %v", err)
|
|
}
|
|
}
|
|
conn.wgProxyRelay = proxy
|
|
}
|
|
|
|
// onWGHandshakeSuccess is called when the first WireGuard handshake is detected
|
|
func (conn *Conn) onWGHandshakeSuccess(when time.Time) {
|
|
conn.metricsStages.RecordWGHandshakeSuccess(when)
|
|
conn.recordConnectionMetrics()
|
|
}
|
|
|
|
// recordConnectionMetrics records connection stage timestamps as metrics
|
|
func (conn *Conn) recordConnectionMetrics() {
|
|
if conn.metricsRecorder == nil {
|
|
return
|
|
}
|
|
|
|
// Determine connection type based on current priority
|
|
conn.mu.Lock()
|
|
priority := conn.currentConnPriority
|
|
conn.mu.Unlock()
|
|
|
|
var connType metrics.ConnectionType
|
|
switch priority {
|
|
case conntype.Relay:
|
|
connType = metrics.ConnectionTypeRelay
|
|
default:
|
|
connType = metrics.ConnectionTypeICE
|
|
}
|
|
|
|
// Record metrics with timestamps - duration calculation happens in metrics package
|
|
conn.metricsRecorder.RecordConnectionStages(
|
|
context.Background(),
|
|
conn.config.Key,
|
|
connType,
|
|
conn.metricsStages.IsReconnection(),
|
|
conn.metricsStages.GetTimestamps(),
|
|
)
|
|
}
|
|
|
|
// AllowedIP returns the allowed IP of the remote peer
|
|
func (conn *Conn) AllowedIP() netip.Addr {
|
|
return conn.config.WgConfig.AllowedIps[0].Addr()
|
|
}
|
|
|
|
func (conn *Conn) AgentVersionString() string {
|
|
return conn.config.AgentVersion
|
|
}
|
|
|
|
func (conn *Conn) presharedKey(remoteRosenpassKey []byte) *wgtypes.Key {
|
|
if conn.config.RosenpassConfig.PubKey == nil {
|
|
return conn.config.WgConfig.PreSharedKey
|
|
}
|
|
|
|
if remoteRosenpassKey == nil && conn.config.RosenpassConfig.PermissiveMode {
|
|
return conn.config.WgConfig.PreSharedKey
|
|
}
|
|
|
|
// If Rosenpass has already set a PSK for this peer, return nil to prevent
|
|
// UpdatePeer from overwriting the Rosenpass-managed key.
|
|
if conn.rosenpassInitializedPresharedKeyValidator != nil && conn.rosenpassInitializedPresharedKeyValidator(conn.config.Key) {
|
|
return nil
|
|
}
|
|
|
|
// Use NetBird PSK as the seed for Rosenpass. This same PSK is passed to
|
|
// Rosenpass as PeerConfig.PresharedKey, ensuring the derived post-quantum
|
|
// key is cryptographically bound to the original secret.
|
|
if conn.config.WgConfig.PreSharedKey != nil {
|
|
return conn.config.WgConfig.PreSharedKey
|
|
}
|
|
|
|
// Fallback to deterministic key if no NetBird PSK is configured
|
|
determKey, err := conn.rosenpassDetermKey()
|
|
if err != nil {
|
|
conn.Log.Errorf("failed to generate Rosenpass initial key: %v", err)
|
|
return nil
|
|
}
|
|
|
|
return determKey
|
|
}
|
|
|
|
// todo: move this logic into Rosenpass package
|
|
func (conn *Conn) rosenpassDetermKey() (*wgtypes.Key, error) {
|
|
lk := []byte(conn.config.LocalKey)
|
|
rk := []byte(conn.config.Key) // remote key
|
|
var keyInput []byte
|
|
if string(lk) > string(rk) {
|
|
//nolint:gocritic
|
|
keyInput = append(lk[:16], rk[:16]...)
|
|
} else {
|
|
//nolint:gocritic
|
|
keyInput = append(rk[:16], lk[:16]...)
|
|
}
|
|
|
|
key, err := wgtypes.NewKey(keyInput)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &key, nil
|
|
}
|
|
|
|
func isController(config ConnConfig) bool {
|
|
return config.LocalKey > config.Key
|
|
}
|
|
|
|
func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool {
|
|
return remoteRosenpassPubKey != nil
|
|
}
|