diff --git a/docs/otel-review.md b/docs/otel-review.md index 35c47d2..ba824e7 100644 --- a/docs/otel-review.md +++ b/docs/otel-review.md @@ -1,126 +1,126 @@ -# Newt OpenTelemetry Review - -## Overview - -This document summarises the current OpenTelemetry (OTel) instrumentation in Newt, assesses -compliance with OTel guidelines, and lists concrete improvements to pursue before release. -It is based on the implementation in `internal/telemetry` and the call-sites that emit -metrics and traces across the code base. - -## Current metric instrumentation - -All instruments are registered in `internal/telemetry/metrics.go`. They are grouped -into site, tunnel, connection, configuration, build, WebSocket, and proxy domains. -A global attribute filter (see `buildMeterProvider`) constrains exposed label keys to -`site_id`, `region`, and a curated list of low-cardinality dimensions so that Prometheus -exports stay bounded. - -- **Site lifecycle**: `newt_site_registrations_total`, `newt_site_online`, and - `newt_site_last_heartbeat_seconds` capture registration attempts and liveness. They - are fed either manually (`IncSiteRegistration`) or via the `TelemetryView` state - callback that publishes observable gauges for the active site. -- **Tunnel health and usage**: Counters and histograms track bytes, latency, reconnects, - and active sessions per tunnel (`newt_tunnel_*` family). Attribute helpers respect - the `NEWT_METRICS_INCLUDE_TUNNEL_ID` toggle to keep cardinality manageable on larger - fleets. -- **Connection attempts**: `newt_connection_attempts_total` and - `newt_connection_errors_total` are emitted throughout the WebSocket client to classify - authentication, dial, and transport failures. -- **Operations/configuration**: `newt_config_reloads_total`, - `newt_restart_count_total`, `newt_config_apply_seconds`, and - `newt_cert_rotation_total` provide visibility into blueprint reloads, process boots, - configuration timings, and certificate rotation outcomes. -- **Build metadata**: `newt_build_info` records the binary version/commit together - with a monotonic restart counter when build information is supplied at startup. -- **WebSocket control-plane**: `newt_websocket_connect_latency_seconds` and - `newt_websocket_messages_total` report connect latency and ping/pong/text activity. -- **Proxy data-plane**: Observable gauges (`newt_proxy_active_connections`, - `newt_proxy_buffer_bytes`, `newt_proxy_async_backlog_bytes`) and the - `newt_proxy_drops_total` counter are fed from the proxy manager to monitor backlog - and drop behaviour alongside per-protocol byte counters. - -Refer to `docs/observability.md` for a tabular catalogue with instrument types, -attributes, and sample exposition lines. - -## Tracing coverage - -Tracing is optional and enabled only when OTLP export is configured. When active: - -- The admin HTTP mux is wrapped with `otelhttp.NewHandler`, producing spans for - `/metrics` and `/healthz` requests. -- The WebSocket dial path creates a `ws.connect` span around the gRPC-based handshake. - -No other subsystems currently create spans, so data-plane operations, blueprint fetches, -Docker discovery, and WireGuard reconfiguration happen without trace context. - -## Guideline & best-practice alignment - -The implementation adheres to most OTel Go recommendations: - -- **Naming & units** – Every instrument follows the `newt_*` prefix with `_total` - suffixes for counters and `_seconds`/`_bytes` unit conventions. Histograms are - registered with explicit second-based buckets. -- **Resource attributes** – Service name/version and optional `site_id`/`region` - populate the `resource.Resource` and are also injected as metric attributes for - compatibility with Prometheus queries. -- **Attribute hygiene** – A single attribute filter (`sdkmetric.WithView`) enforces - the allow-list of label keys to prevent accidental high-cardinality emission. -- **Runtime metrics** – Go runtime instrumentation is enabled automatically through - `runtime.Start`. -- **Configuration via environment** – `telemetry.FromEnv` honours `OTEL_*` variables - alongside `NEWT_*` overrides so operators can configure exporters without code - changes. -- **Shutdown handling** – `Setup.Shutdown` iterates exporters in reverse order to - flush buffers before process exit. - -## Adjustments & improvements - -The review identified a few actionable adjustments: - -1. **Record registration failures** – `newt_site_registrations_total` is currently - incremented only on success. Emit `result="failure"` samples whenever Pangolin - rejects a registration or credential exchange so operators can alert on churn. -2. **Surface config reload failures** – `telemetry.IncConfigReload` is invoked with - `result="success"` only. Callers should record a failure result when blueprint - parsing or application aborts before success counters are incremented. -3. **Harmonise restart count behaviour** – `newt_restart_count_total` increments only - when build metadata is provided. Consider moving the increment out of - `RegisterBuildInfo` so the counter advances even for ad-hoc builds without version - strings. -4. **Propagate contexts where available** – Many emitters call metric helpers with - `context.Background()`. Passing real contexts (when inexpensive) would allow future - exporters to correlate spans and metrics. -5. **Extend tracing coverage** – Instrument critical flows such as blueprint fetches, - WireGuard reconfiguration, proxy accept loops, and Docker discovery to provide end - to end visibility when OTLP tracing is enabled. - -## Metrics to add before release - -Prioritised additions that would close visibility gaps: - -1. **WebSocket disconnect outcomes** – A counter (e.g., `newt_websocket_disconnects_total`) - partitioned by `reason` would complement the existing connect latency histogram and - explain reconnect storms. -2. **Keepalive/heartbeat failures** – Counting ping timeouts or heartbeat misses would - make `newt_site_last_heartbeat_seconds` actionable by providing discrete events. -3. **Proxy connection lifecycle** – Add counters/histograms for proxy accept events and - connection durations to correlate drops with load and backlog metrics. -4. **Blueprint/config pull latency** – Measuring Pangolin blueprint fetch durations and - HTTP status distribution would expose slow control-plane operations. -5. **Certificate rotation attempts** – Complement `newt_cert_rotation_total` with a - duration histogram to observe slow PKI updates and detect stuck rotations. - -These metrics rely on data that is already available in the code paths mentioned -above and would round out operational dashboards. - -## Tracing wishlist - -To benefit from tracing when OTLP is active, add spans around: - -- Pangolin REST calls (wrap the HTTP client with `otelhttp.NewTransport`). -- Docker discovery cycles and target registration callbacks. -- WireGuard reconfiguration (interface bring-up, peer updates). -- Proxy dial/accept loops for both TCP and UDP targets. - -Capturing these stages will let operators correlate latency spikes with reconnects -and proxy drops using distributed traces in addition to the metric signals. +# Newt OpenTelemetry Review + +## Overview + +This document summarises the current OpenTelemetry (OTel) instrumentation in Newt, assesses +compliance with OTel guidelines, and lists concrete improvements to pursue before release. +It is based on the implementation in `internal/telemetry` and the call-sites that emit +metrics and traces across the code base. + +## Current metric instrumentation + +All instruments are registered in `internal/telemetry/metrics.go`. They are grouped +into site, tunnel, connection, configuration, build, WebSocket, and proxy domains. +A global attribute filter (see `buildMeterProvider`) constrains exposed label keys to +`site_id`, `region`, and a curated list of low-cardinality dimensions so that Prometheus +exports stay bounded. + +- **Site lifecycle**: `newt_site_registrations_total`, `newt_site_online`, and + `newt_site_last_heartbeat_seconds` capture registration attempts and liveness. They + are fed either manually (`IncSiteRegistration`) or via the `TelemetryView` state + callback that publishes observable gauges for the active site. +- **Tunnel health and usage**: Counters and histograms track bytes, latency, reconnects, + and active sessions per tunnel (`newt_tunnel_*` family). Attribute helpers respect + the `NEWT_METRICS_INCLUDE_TUNNEL_ID` toggle to keep cardinality manageable on larger + fleets. +- **Connection attempts**: `newt_connection_attempts_total` and + `newt_connection_errors_total` are emitted throughout the WebSocket client to classify + authentication, dial, and transport failures. +- **Operations/configuration**: `newt_config_reloads_total`, + `newt_restart_count_total`, `newt_config_apply_seconds`, and + `newt_cert_rotation_total` provide visibility into blueprint reloads, process boots, + configuration timings, and certificate rotation outcomes. +- **Build metadata**: `newt_build_info` records the binary version/commit together + with a monotonic restart counter when build information is supplied at startup. +- **WebSocket control-plane**: `newt_websocket_connect_latency_seconds` and + `newt_websocket_messages_total` report connect latency and ping/pong/text activity. +- **Proxy data-plane**: Observable gauges (`newt_proxy_active_connections`, + `newt_proxy_buffer_bytes`, `newt_proxy_async_backlog_bytes`) and the + `newt_proxy_drops_total` counter are fed from the proxy manager to monitor backlog + and drop behaviour alongside per-protocol byte counters. + +Refer to `docs/observability.md` for a tabular catalogue with instrument types, +attributes, and sample exposition lines. + +## Tracing coverage + +Tracing is optional and enabled only when OTLP export is configured. When active: + +- The admin HTTP mux is wrapped with `otelhttp.NewHandler`, producing spans for + `/metrics` and `/healthz` requests. +- The WebSocket dial path creates a `ws.connect` span around the gRPC-based handshake. + +No other subsystems currently create spans, so data-plane operations, blueprint fetches, +Docker discovery, and WireGuard reconfiguration happen without trace context. + +## Guideline & best-practice alignment + +The implementation adheres to most OTel Go recommendations: + +- **Naming & units** – Every instrument follows the `newt_*` prefix with `_total` + suffixes for counters and `_seconds`/`_bytes` unit conventions. Histograms are + registered with explicit second-based buckets. +- **Resource attributes** – Service name/version and optional `site_id`/`region` + populate the `resource.Resource` and are also injected as metric attributes for + compatibility with Prometheus queries. +- **Attribute hygiene** – A single attribute filter (`sdkmetric.WithView`) enforces + the allow-list of label keys to prevent accidental high-cardinality emission. +- **Runtime metrics** – Go runtime instrumentation is enabled automatically through + `runtime.Start`. +- **Configuration via environment** – `telemetry.FromEnv` honours `OTEL_*` variables + alongside `NEWT_*` overrides so operators can configure exporters without code + changes. +- **Shutdown handling** – `Setup.Shutdown` iterates exporters in reverse order to + flush buffers before process exit. + +## Adjustments & improvements + +The review identified a few actionable adjustments: + +1. **Record registration failures** – `newt_site_registrations_total` is currently + incremented only on success. Emit `result="failure"` samples whenever Pangolin + rejects a registration or credential exchange so operators can alert on churn. +2. **Surface config reload failures** – `telemetry.IncConfigReload` is invoked with + `result="success"` only. Callers should record a failure result when blueprint + parsing or application aborts before success counters are incremented. +3. **Harmonise restart count behaviour** – `newt_restart_count_total` increments only + when build metadata is provided. Consider moving the increment out of + `RegisterBuildInfo` so the counter advances even for ad-hoc builds without version + strings. +4. **Propagate contexts where available** – Many emitters call metric helpers with + `context.Background()`. Passing real contexts (when inexpensive) would allow future + exporters to correlate spans and metrics. +5. **Extend tracing coverage** – Instrument critical flows such as blueprint fetches, + WireGuard reconfiguration, proxy accept loops, and Docker discovery to provide end + to end visibility when OTLP tracing is enabled. + +## Metrics to add before release + +Prioritised additions that would close visibility gaps: + +1. **WebSocket disconnect outcomes** – A counter (e.g., `newt_websocket_disconnects_total`) + partitioned by `reason` would complement the existing connect latency histogram and + explain reconnect storms. +2. **Keepalive/heartbeat failures** – Counting ping timeouts or heartbeat misses would + make `newt_site_last_heartbeat_seconds` actionable by providing discrete events. +3. **Proxy connection lifecycle** – Add counters/histograms for proxy accept events and + connection durations to correlate drops with load and backlog metrics. +4. **Blueprint/config pull latency** – Measuring Pangolin blueprint fetch durations and + HTTP status distribution would expose slow control-plane operations. +5. **Certificate rotation attempts** – Complement `newt_cert_rotation_total` with a + duration histogram to observe slow PKI updates and detect stuck rotations. + +These metrics rely on data that is already available in the code paths mentioned +above and would round out operational dashboards. + +## Tracing wishlist + +To benefit from tracing when OTLP is active, add spans around: + +- Pangolin REST calls (wrap the HTTP client with `otelhttp.NewTransport`). +- Docker discovery cycles and target registration callbacks. +- WireGuard reconfiguration (interface bring-up, peer updates). +- Proxy dial/accept loops for both TCP and UDP targets. + +Capturing these stages will let operators correlate latency spikes with reconnects +and proxy drops using distributed traces in addition to the metric signals. diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index 1d11927..5403e43 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -45,14 +45,19 @@ var ( mBuildInfo metric.Int64ObservableGauge // WebSocket - mWSConnectLatency metric.Float64Histogram - mWSMessages metric.Int64Counter + mWSConnectLatency metric.Float64Histogram + mWSMessages metric.Int64Counter + mWSDisconnects metric.Int64Counter + mWSKeepaliveFailure metric.Int64Counter + mWSSessionDuration metric.Float64Histogram // Proxy mProxyActiveConns metric.Int64ObservableGauge mProxyBufferBytes metric.Int64ObservableGauge mProxyAsyncBacklogByte metric.Int64ObservableGauge mProxyDropsTotal metric.Int64Counter + mProxyAcceptsTotal metric.Int64Counter + mProxyConnDuration metric.Float64Histogram buildVersion string buildCommit string @@ -179,6 +184,13 @@ func registerBuildWSProxyInstruments() error { metric.WithUnit("s")) mWSMessages, _ = meter.Int64Counter("newt_websocket_messages_total", metric.WithDescription("WebSocket messages by direction and type")) + mWSDisconnects, _ = meter.Int64Counter("newt_websocket_disconnects_total", + metric.WithDescription("WebSocket disconnects by reason/result")) + mWSKeepaliveFailure, _ = meter.Int64Counter("newt_websocket_keepalive_failures_total", + metric.WithDescription("WebSocket keepalive (ping/pong) failures")) + mWSSessionDuration, _ = meter.Float64Histogram("newt_websocket_session_duration_seconds", + metric.WithDescription("Duration of established WebSocket sessions"), + metric.WithUnit("s")) // Proxy mProxyActiveConns, _ = meter.Int64ObservableGauge("newt_proxy_active_connections", metric.WithDescription("Proxy active connections per tunnel and protocol")) @@ -190,6 +202,11 @@ func registerBuildWSProxyInstruments() error { metric.WithUnit("By")) mProxyDropsTotal, _ = meter.Int64Counter("newt_proxy_drops_total", metric.WithDescription("Proxy drops due to write errors")) + mProxyAcceptsTotal, _ = meter.Int64Counter("newt_proxy_accept_total", + metric.WithDescription("Proxy connection accepts by protocol and result")) + mProxyConnDuration, _ = meter.Float64Histogram("newt_proxy_connection_duration_seconds", + metric.WithDescription("Duration of completed proxy connections"), + metric.WithUnit("s")) // Register a default callback for build info if version/commit set reg, e := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { if buildVersion == "" && buildCommit == "" { @@ -328,6 +345,25 @@ func IncWSMessage(ctx context.Context, direction, msgType string) { )...)) } +func IncWSDisconnect(ctx context.Context, reason, result string) { + mWSDisconnects.Add(ctx, 1, metric.WithAttributes(attrsWithSite( + attribute.String("reason", reason), + attribute.String("result", result), + )...)) +} + +func IncWSKeepaliveFailure(ctx context.Context, reason string) { + mWSKeepaliveFailure.Add(ctx, 1, metric.WithAttributes(attrsWithSite( + attribute.String("reason", reason), + )...)) +} + +func ObserveWSSessionDuration(ctx context.Context, seconds float64, result string) { + mWSSessionDuration.Record(ctx, seconds, metric.WithAttributes(attrsWithSite( + attribute.String("result", result), + )...)) +} + // --- Proxy helpers --- func ObserveProxyActiveConnsObs(o metric.Observer, value int64, attrs []attribute.KeyValue) { @@ -352,6 +388,31 @@ func IncProxyDrops(ctx context.Context, tunnelID, protocol string) { mProxyDropsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite(attrs...)...)) } +func IncProxyAccept(ctx context.Context, tunnelID, protocol, result, reason string) { + attrs := []attribute.KeyValue{ + attribute.String("protocol", protocol), + attribute.String("result", result), + } + if reason != "" { + attrs = append(attrs, attribute.String("reason", reason)) + } + if ShouldIncludeTunnelID() && tunnelID != "" { + attrs = append(attrs, attribute.String("tunnel_id", tunnelID)) + } + mProxyAcceptsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite(attrs...)...)) +} + +func ObserveProxyConnectionDuration(ctx context.Context, tunnelID, protocol, result string, seconds float64) { + attrs := []attribute.KeyValue{ + attribute.String("protocol", protocol), + attribute.String("result", result), + } + if ShouldIncludeTunnelID() && tunnelID != "" { + attrs = append(attrs, attribute.String("tunnel_id", tunnelID)) + } + mProxyConnDuration.Record(ctx, seconds, metric.WithAttributes(attrsWithSite(attrs...)...)) +} + // --- Config/PKI helpers --- func ObserveConfigApply(ctx context.Context, phase, result string, seconds float64) { diff --git a/main.go b/main.go index 959b906..83f7524 100644 --- a/main.go +++ b/main.go @@ -586,6 +586,10 @@ func main() { // Register handlers for different message types client.RegisterHandler("newt/wg/connect", func(msg websocket.WSMessage) { logger.Info("Received registration message") + regResult := "success" + defer func() { + telemetry.IncSiteRegistration(ctx, regResult) + }() if stopFunc != nil { stopFunc() // stop the ws from sending more requests stopFunc = nil // reset stopFunc to nil to avoid double stopping @@ -605,11 +609,13 @@ func main() { jsonData, err := json.Marshal(msg.Data) if err != nil { logger.Info(fmtErrMarshaling, err) + regResult = "failure" return } if err := json.Unmarshal(jsonData, &wgData); err != nil { logger.Info("Error unmarshaling target data: %v", err) + regResult = "failure" return } @@ -620,6 +626,7 @@ func main() { mtuInt) if err != nil { logger.Error("Failed to create TUN device: %v", err) + regResult = "failure" } setDownstreamTNetstack(tnet) @@ -633,6 +640,7 @@ func main() { host, _, err := net.SplitHostPort(wgData.Endpoint) if err != nil { logger.Error("Failed to split endpoint: %v", err) + regResult = "failure" return } @@ -641,6 +649,7 @@ func main() { endpoint, err := resolveDomain(wgData.Endpoint) if err != nil { logger.Error("Failed to resolve endpoint: %v", err) + regResult = "failure" return } @@ -656,12 +665,14 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub err = dev.IpcSet(config) if err != nil { logger.Error("Failed to configure WireGuard device: %v", err) + regResult = "failure" } // Bring up the device err = dev.Up() if err != nil { logger.Error("Failed to bring up WireGuard device: %v", err) + regResult = "failure" } logger.Debug("WireGuard device created. Lets ping the server now...") @@ -676,10 +687,11 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub logger.Debug("Testing initial connection with reliable ping...") lat, err := reliablePing(tnet, wgData.ServerIP, pingTimeout, 5) if err == nil && wgData.PublicKey != "" { - telemetry.ObserveTunnelLatency(context.Background(), wgData.PublicKey, "wireguard", lat.Seconds()) + telemetry.ObserveTunnelLatency(ctx, wgData.PublicKey, "wireguard", lat.Seconds()) } if err != nil { logger.Warn("Initial reliable ping failed, but continuing: %v", err) + regResult = "failure" } else { logger.Info("Initial connection test successful") } @@ -701,9 +713,6 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub connected = true - // telemetry: record a successful site registration (omit region unless available) - telemetry.IncSiteRegistration(context.Background(), "success") - // add the targets if there are any if len(wgData.Targets.TCP) > 0 { updateTargets(pm, "add", wgData.TunnelIP, "tcp", TargetData{Targets: wgData.Targets.TCP}) @@ -738,7 +747,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub client.RegisterHandler("newt/wg/reconnect", func(msg websocket.WSMessage) { logger.Info("Received reconnect message") if wgData.PublicKey != "" { - telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server", telemetry.ReasonServerRequest) + telemetry.IncReconnect(ctx, wgData.PublicKey, "server", telemetry.ReasonServerRequest) } // Close the WireGuard device and TUN @@ -767,7 +776,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub client.RegisterHandler("newt/wg/terminate", func(msg websocket.WSMessage) { logger.Info("Received termination message") if wgData.PublicKey != "" { - telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server", telemetry.ReasonServerRequest) + telemetry.IncReconnect(ctx, wgData.PublicKey, "server", telemetry.ReasonServerRequest) } // Close the WireGuard device and TUN @@ -837,7 +846,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub }, } -stopFunc = client.SendMessageInterval(topicWGRegister, map[string]interface{}{ + stopFunc = client.SendMessageInterval(topicWGRegister, map[string]interface{}{ "publicKey": publicKey.String(), "pingResults": pingResults, "newtVersion": newtVersion, @@ -940,7 +949,7 @@ stopFunc = client.SendMessageInterval(topicWGRegister, map[string]interface{}{ } // Send the ping results to the cloud for selection -stopFunc = client.SendMessageInterval(topicWGRegister, map[string]interface{}{ + stopFunc = client.SendMessageInterval(topicWGRegister, map[string]interface{}{ "publicKey": publicKey.String(), "pingResults": pingResults, "newtVersion": newtVersion, diff --git a/proxy/manager.go b/proxy/manager.go index ceaa12b..31e7788 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -2,6 +2,7 @@ package proxy import ( "context" + "errors" "fmt" "io" "net" @@ -97,6 +98,32 @@ func (cw *countingWriter) Write(p []byte) (int, error) { return n, err } +func classifyProxyError(err error) string { + if err == nil { + return "" + } + if errors.Is(err, net.ErrClosed) { + return "closed" + } + if ne, ok := err.(net.Error); ok { + if ne.Timeout() { + return "timeout" + } + if ne.Temporary() { + return "temporary" + } + } + msg := strings.ToLower(err.Error()) + switch { + case strings.Contains(msg, "refused"): + return "refused" + case strings.Contains(msg, "reset"): + return "reset" + default: + return "io_error" + } +} + // NewProxyManager creates a new proxy manager instance func NewProxyManager(tnet *netstack.Net) *ProxyManager { return &ProxyManager{ @@ -467,72 +494,69 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) for { conn, err := listener.Accept() if err != nil { - // Check if we're shutting down or the listener was closed + telemetry.IncProxyAccept(context.Background(), pm.currentTunnelID, "tcp", "failure", classifyProxyError(err)) if !pm.running { return } - - // Check for specific network errors that indicate the listener is closed if ne, ok := err.(net.Error); ok && !ne.Temporary() { logger.Info("TCP listener closed, stopping proxy handler for %v", listener.Addr()) return } - logger.Error("Error accepting TCP connection: %v", err) - // Don't hammer the CPU if we hit a temporary error time.Sleep(100 * time.Millisecond) continue } - // Count sessions only once per accepted TCP connection - if pm.currentTunnelID != "" { - state.Global().IncSessions(pm.currentTunnelID) - if e := pm.getEntry(pm.currentTunnelID); e != nil { + tunnelID := pm.currentTunnelID + telemetry.IncProxyAccept(context.Background(), tunnelID, "tcp", "success", "") + if tunnelID != "" { + state.Global().IncSessions(tunnelID) + if e := pm.getEntry(tunnelID); e != nil { e.activeTCP.Add(1) } } - go func() { + go func(tunnelID string, accepted net.Conn) { + connStart := time.Now() target, err := net.Dial("tcp", targetAddr) if err != nil { logger.Error("Error connecting to target: %v", err) - conn.Close() + accepted.Close() + telemetry.IncProxyAccept(context.Background(), tunnelID, "tcp", "failure", classifyProxyError(err)) + telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "tcp", "failure", time.Since(connStart).Seconds()) return } - // already incremented on accept - - // Create a WaitGroup to ensure both copy operations complete + entry := pm.getEntry(tunnelID) + if entry == nil { + entry = &tunnelEntry{} + } var wg sync.WaitGroup wg.Add(2) - // client -> target (direction=in) - go func() { + go func(ent *tunnelEntry) { defer wg.Done() - e := pm.getEntry(pm.currentTunnelID) - cw := &countingWriter{ctx: context.Background(), w: target, set: e.attrInTCP, pm: pm, ent: e, out: false, proto: "tcp"} - _, _ = io.Copy(cw, conn) + cw := &countingWriter{ctx: context.Background(), w: target, set: ent.attrInTCP, pm: pm, ent: ent, out: false, proto: "tcp"} + _, _ = io.Copy(cw, accepted) _ = target.Close() - }() + }(entry) - // target -> client (direction=out) - go func() { + go func(ent *tunnelEntry) { defer wg.Done() - e := pm.getEntry(pm.currentTunnelID) - cw := &countingWriter{ctx: context.Background(), w: conn, set: e.attrOutTCP, pm: pm, ent: e, out: true, proto: "tcp"} + cw := &countingWriter{ctx: context.Background(), w: accepted, set: ent.attrOutTCP, pm: pm, ent: ent, out: true, proto: "tcp"} _, _ = io.Copy(cw, target) - _ = conn.Close() - }() + _ = accepted.Close() + }(entry) - // Wait for both copies to complete then session -1 wg.Wait() - if pm.currentTunnelID != "" { - state.Global().DecSessions(pm.currentTunnelID) - if e := pm.getEntry(pm.currentTunnelID); e != nil { + if tunnelID != "" { + state.Global().DecSessions(tunnelID) + if e := pm.getEntry(tunnelID); e != nil { e.activeTCP.Add(-1) } } - }() + telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "tcp", "success", time.Since(connStart).Seconds()) + }(tunnelID, conn) } } @@ -595,16 +619,20 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { targetUDPAddr, err := net.ResolveUDPAddr("udp", targetAddr) if err != nil { logger.Error("Error resolving target address: %v", err) + telemetry.IncProxyAccept(context.Background(), pm.currentTunnelID, "udp", "failure", "resolve") continue } targetConn, err = net.DialUDP("udp", nil, targetUDPAddr) if err != nil { logger.Error("Error connecting to target: %v", err) + telemetry.IncProxyAccept(context.Background(), pm.currentTunnelID, "udp", "failure", classifyProxyError(err)) continue } + tunnelID := pm.currentTunnelID + telemetry.IncProxyAccept(context.Background(), tunnelID, "udp", "success", "") // Only increment activeUDP after a successful DialUDP - if e := pm.getEntry(pm.currentTunnelID); e != nil { + if e := pm.getEntry(tunnelID); e != nil { e.activeUDP.Add(1) } @@ -612,18 +640,21 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { clientConns[clientKey] = targetConn clientsMutex.Unlock() - go func(clientKey string, targetConn *net.UDPConn, remoteAddr net.Addr) { + go func(clientKey string, targetConn *net.UDPConn, remoteAddr net.Addr, tunnelID string) { + start := time.Now() + result := "success" defer func() { // Always clean up when this goroutine exits clientsMutex.Lock() if storedConn, exists := clientConns[clientKey]; exists && storedConn == targetConn { delete(clientConns, clientKey) targetConn.Close() - if e := pm.getEntry(pm.currentTunnelID); e != nil { + if e := pm.getEntry(tunnelID); e != nil { e.activeUDP.Add(-1) } } clientsMutex.Unlock() + telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "udp", result, time.Since(start).Seconds()) }() buffer := make([]byte, 65507) @@ -631,6 +662,7 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { n, _, err := targetConn.ReadFromUDP(buffer) if err != nil { logger.Error("Error reading from target: %v", err) + result = "failure" return // defer will handle cleanup } @@ -651,10 +683,11 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { if err != nil { logger.Error("Error writing to client: %v", err) telemetry.IncProxyDrops(context.Background(), pm.currentTunnelID, "udp") + result = "failure" return // defer will handle cleanup } } - }(clientKey, targetConn, remoteAddr) + }(clientKey, targetConn, remoteAddr, tunnelID) } written, err := targetConn.Write(buffer[:n]) diff --git a/websocket/client.go b/websocket/client.go index ee6c2e6..ba39202 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "io" + "net" "net/http" "net/url" "os" @@ -42,6 +43,8 @@ type Client struct { writeMux sync.Mutex clientType string // Type of client (e.g., "newt", "olm") tlsConfig TLSConfig + metricsCtxMu sync.RWMutex + metricsCtx context.Context } type ClientOption func(*Client) @@ -85,6 +88,26 @@ func (c *Client) OnTokenUpdate(callback func(token string)) { c.onTokenUpdate = callback } +func (c *Client) metricsContext() context.Context { + c.metricsCtxMu.RLock() + defer c.metricsCtxMu.RUnlock() + if c.metricsCtx != nil { + return c.metricsCtx + } + return context.Background() +} + +func (c *Client) setMetricsContext(ctx context.Context) { + c.metricsCtxMu.Lock() + c.metricsCtx = ctx + c.metricsCtxMu.Unlock() +} + +// MetricsContext exposes the context used for telemetry emission when a connection is active. +func (c *Client) MetricsContext() context.Context { + return c.metricsContext() +} + // NewClient creates a new websocket client func NewClient(clientType string, ID, secret string, endpoint string, pingInterval time.Duration, pingTimeout time.Duration, opts ...ClientOption) (*Client, error) { config := &Config{ @@ -177,7 +200,7 @@ func (c *Client) SendMessage(messageType string, data interface{}) error { if err := c.conn.WriteJSON(msg); err != nil { return err } - telemetry.IncWSMessage(context.Background(), "out", "text") + telemetry.IncWSMessage(c.metricsContext(), "out", "text") return nil } @@ -273,8 +296,12 @@ func (c *Client) getToken() (string, error) { return "", fmt.Errorf("failed to marshal token request data: %w", err) } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + // Create a new request - req, err := http.NewRequest( + req, err := http.NewRequestWithContext( + ctx, "POST", baseEndpoint+"/api/v1/auth/"+c.clientType+"/get-token", bytes.NewBuffer(jsonData), @@ -296,7 +323,8 @@ func (c *Client) getToken() (string, error) { } resp, err := client.Do(req) if err != nil { - telemetry.IncConnError(context.Background(), "auth", classifyConnError(err)) + telemetry.IncConnAttempt(ctx, "auth", "failure") + telemetry.IncConnError(ctx, "auth", classifyConnError(err)) return "", fmt.Errorf("failed to request new token: %w", err) } defer resp.Body.Close() @@ -304,15 +332,15 @@ func (c *Client) getToken() (string, error) { if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) - telemetry.IncConnAttempt(context.Background(), "auth", "failure") + telemetry.IncConnAttempt(ctx, "auth", "failure") etype := "io_error" if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { etype = "auth_failed" } - telemetry.IncConnError(context.Background(), "auth", etype) + telemetry.IncConnError(ctx, "auth", etype) // Reconnect reason mapping for auth failures if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { - telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonAuthError) + telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonAuthError) } return "", fmt.Errorf("failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) } @@ -332,7 +360,7 @@ func (c *Client) getToken() (string, error) { } logger.Debug("Received token: %s", tokenResp.Data.Token) - telemetry.IncConnAttempt(context.Background(), "auth", "success") + telemetry.IncConnAttempt(ctx, "auth", "success") return tokenResp.Data.Token, nil } @@ -357,6 +385,30 @@ func classifyConnError(err error) string { } } +func classifyWSDisconnect(err error) (result, reason string) { + if err == nil { + return "success", "normal" + } + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + return "success", "normal" + } + if ne, ok := err.(net.Error); ok && ne.Timeout() { + return "error", "timeout" + } + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + return "error", "unexpected_close" + } + msg := strings.ToLower(err.Error()) + switch { + case strings.Contains(msg, "eof"): + return "error", "eof" + case strings.Contains(msg, "reset"): + return "error", "connection_reset" + default: + return "error", "read_error" + } +} + func (c *Client) connectWithRetry() { for { select { @@ -375,13 +427,13 @@ func (c *Client) connectWithRetry() { } func (c *Client) establishConnection() error { + ctx := context.Background() + // Get token for authentication token, err := c.getToken() if err != nil { - // telemetry: connection attempt failed before dialing - // site_id isn't globally available here; use client ID as site_id (low cardinality) - telemetry.IncConnAttempt(context.Background(), "websocket", "failure") - telemetry.IncConnError(context.Background(), "websocket", classifyConnError(err)) + telemetry.IncConnAttempt(ctx, "websocket", "failure") + telemetry.IncConnError(ctx, "websocket", classifyConnError(err)) return fmt.Errorf("failed to get token: %w", err) } @@ -416,7 +468,7 @@ func (c *Client) establishConnection() error { // Connect to WebSocket (optional span) tr := otel.Tracer("newt") - spanCtx, span := tr.Start(context.Background(), "ws.connect") + ctx, span := tr.Start(ctx, "ws.connect") defer span.End() start := time.Now() @@ -441,38 +493,40 @@ func (c *Client) establishConnection() error { logger.Debug("WebSocket TLS certificate verification disabled via SKIP_TLS_VERIFY environment variable") } - conn, _, err := dialer.DialContext(spanCtx, u.String(), nil) + conn, _, err := dialer.DialContext(ctx, u.String(), nil) lat := time.Since(start).Seconds() if err != nil { - telemetry.IncConnAttempt(context.Background(), "websocket", "failure") + telemetry.IncConnAttempt(ctx, "websocket", "failure") etype := classifyConnError(err) - telemetry.IncConnError(context.Background(), "websocket", etype) - telemetry.ObserveWSConnectLatency(context.Background(), lat, "failure", etype) + telemetry.IncConnError(ctx, "websocket", etype) + telemetry.ObserveWSConnectLatency(ctx, lat, "failure", etype) // Map handshake-related errors to reconnect reasons where appropriate if etype == "tls_handshake" { - telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonHandshakeError) + telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonHandshakeError) } else if etype == "dial_timeout" { - telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonTimeout) + telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonTimeout) } else { - telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonError) + telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonError) } return fmt.Errorf("failed to connect to WebSocket: %w", err) } - telemetry.IncConnAttempt(context.Background(), "websocket", "success") - telemetry.ObserveWSConnectLatency(context.Background(), lat, "success", "") + telemetry.IncConnAttempt(ctx, "websocket", "success") + telemetry.ObserveWSConnectLatency(ctx, lat, "success", "") c.conn = conn c.setConnected(true) + c.setMetricsContext(ctx) + sessionStart := time.Now() // Wire up pong handler for metrics c.conn.SetPongHandler(func(appData string) error { - telemetry.IncWSMessage(context.Background(), "in", "pong") + telemetry.IncWSMessage(c.metricsContext(), "in", "pong") return nil }) // Start the ping monitor go c.pingMonitor() // Start the read pump with disconnect detection - go c.readPumpWithDisconnectDetection() + go c.readPumpWithDisconnectDetection(sessionStart) if c.onConnect != nil { err := c.saveConfig() @@ -566,7 +620,7 @@ func (c *Client) pingMonitor() { c.writeMux.Lock() err := c.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(c.pingTimeout)) if err == nil { - telemetry.IncWSMessage(context.Background(), "out", "ping") + telemetry.IncWSMessage(c.metricsContext(), "out", "ping") } c.writeMux.Unlock() if err != nil { @@ -577,6 +631,7 @@ func (c *Client) pingMonitor() { return default: logger.Error("Ping failed: %v", err) + telemetry.IncWSKeepaliveFailure(c.metricsContext(), "ping_write") c.reconnect() return } @@ -586,11 +641,19 @@ func (c *Client) pingMonitor() { } // readPumpWithDisconnectDetection reads messages and triggers reconnect on error -func (c *Client) readPumpWithDisconnectDetection() { +func (c *Client) readPumpWithDisconnectDetection(started time.Time) { + ctx := c.metricsContext() + disconnectReason := "shutdown" + disconnectResult := "success" + defer func() { if c.conn != nil { c.conn.Close() } + if !started.IsZero() { + telemetry.ObserveWSSessionDuration(ctx, time.Since(started).Seconds(), disconnectResult) + } + telemetry.IncWSDisconnect(ctx, disconnectReason, disconnectResult) // Only attempt reconnect if we're not shutting down select { case <-c.done: @@ -604,12 +667,14 @@ func (c *Client) readPumpWithDisconnectDetection() { for { select { case <-c.done: + disconnectReason = "shutdown" + disconnectResult = "success" return default: var msg WSMessage err := c.conn.ReadJSON(&msg) if err == nil { - telemetry.IncWSMessage(context.Background(), "in", "text") + telemetry.IncWSMessage(c.metricsContext(), "in", "text") } if err != nil { // Check if we're shutting down before logging error @@ -617,13 +682,18 @@ func (c *Client) readPumpWithDisconnectDetection() { case <-c.done: // Expected during shutdown, don't log as error logger.Debug("WebSocket connection closed during shutdown") + disconnectReason = "shutdown" + disconnectResult = "success" return default: // Unexpected error during normal operation - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { - logger.Error("WebSocket read error: %v", err) - } else { - logger.Debug("WebSocket connection closed: %v", err) + disconnectResult, disconnectReason = classifyWSDisconnect(err) + if disconnectResult == "error" { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { + logger.Error("WebSocket read error: %v", err) + } else { + logger.Debug("WebSocket connection closed: %v", err) + } } return // triggers reconnect via defer } diff --git a/wg/wg.go b/wg/wg.go index eccd64f..0ab1919 100644 --- a/wg/wg.go +++ b/wg/wg.go @@ -280,6 +280,15 @@ func (s *WireGuardService) LoadRemoteConfig() error { } func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { + ctx := context.Background() + if s.client != nil { + ctx = s.client.MetricsContext() + } + result := "success" + defer func() { + telemetry.IncConfigReload(ctx, result) + }() + var config WgConfig logger.Debug("Received message: %v", msg) @@ -288,11 +297,13 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { jsonData, err := json.Marshal(msg.Data) if err != nil { logger.Info("Error marshaling data: %v", err) + result = "failure" return } if err := json.Unmarshal(jsonData, &config); err != nil { logger.Info("Error unmarshaling target data: %v", err) + result = "failure" return } s.config = config @@ -303,27 +314,28 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { } // telemetry: config reload success - telemetry.IncConfigReload(context.Background(), "success") // Optional reconnect reason mapping: config change if s.serverPubKey != "" { - telemetry.IncReconnect(context.Background(), s.serverPubKey, "client", telemetry.ReasonConfigChange) + telemetry.IncReconnect(ctx, s.serverPubKey, "client", telemetry.ReasonConfigChange) } // Ensure the WireGuard interface and peers are configured start := time.Now() if err := s.ensureWireguardInterface(config); err != nil { logger.Error("Failed to ensure WireGuard interface: %v", err) - telemetry.ObserveConfigApply(context.Background(), "interface", "failure", time.Since(start).Seconds()) + telemetry.ObserveConfigApply(ctx, "interface", "failure", time.Since(start).Seconds()) + result = "failure" } else { - telemetry.ObserveConfigApply(context.Background(), "interface", "success", time.Since(start).Seconds()) + telemetry.ObserveConfigApply(ctx, "interface", "success", time.Since(start).Seconds()) } startPeers := time.Now() if err := s.ensureWireguardPeers(config.Peers); err != nil { logger.Error("Failed to ensure WireGuard peers: %v", err) - telemetry.ObserveConfigApply(context.Background(), "peer", "failure", time.Since(startPeers).Seconds()) + telemetry.ObserveConfigApply(ctx, "peer", "failure", time.Since(startPeers).Seconds()) + result = "failure" } else { - telemetry.ObserveConfigApply(context.Background(), "peer", "success", time.Since(startPeers).Seconds()) + telemetry.ObserveConfigApply(ctx, "peer", "success", time.Since(startPeers).Seconds()) } }