diff --git a/docs/otel-review.md b/docs/otel-review.md new file mode 100644 index 0000000..14cfb53 --- /dev/null +++ b/docs/otel-review.md @@ -0,0 +1,64 @@ +# OpenTelemetry Review + +## Metric inventory +The table below lists every instrument registered by `internal/telemetry/metrics.go`, the helper that emits it, and an example time-series. Attribute sets automatically add `site_id` (and optionally `region`) via `attrsWithSite` unless the observable callback overrides them. 【F:internal/telemetry/metrics.go†L23-L205】【F:internal/telemetry/metrics.go†L289-L403】 + +| Metric | Instrument & unit | Purpose | Emission path | Example series | +| --- | --- | --- | --- | --- | +| `newt_site_registrations_total` | Counter | Counts Pangolin registration attempts keyed by result (`success`, `failure`). | `telemetry.IncSiteRegistration` (called after registration completes). | `newt_site_registrations_total{result="success",site_id="abc"} 1` | +| `newt_site_online` | Observable gauge | 0/1 heartbeat for the active site, driven by the registered `StateView`. | `telemetry.SetObservableCallback` via `state.TelemetryView`. | `newt_site_online{site_id="self"} 1` | +| `newt_site_last_heartbeat_seconds` | Observable gauge | Seconds since the last Pangolin heartbeat. | Same callback as above using `state.TelemetryView.TouchHeartbeat`. | `newt_site_last_heartbeat_seconds{site_id="self"} 3.2` | +| `newt_tunnel_sessions` | Observable gauge | Active sessions per tunnel; collapses to site total when `tunnel_id` emission is disabled. | `state.TelemetryView.SessionsByTunnel` via `RegisterStateView`. | `newt_tunnel_sessions{site_id="self",tunnel_id="wgpub"} 2` | +| `newt_tunnel_bytes_total` | Counter (`By`) | Traffic accounting per tunnel, direction (`ingress`/`egress`), protocol (`tcp`/`udp`). | Proxy manager counting writers (`AddTunnelBytes`/`AddTunnelBytesSet`). | `newt_tunnel_bytes_total{direction="egress",protocol="tcp",site_id="self",tunnel_id="wgpub"} 8192` | +| `newt_tunnel_latency_seconds` | Histogram (`s`) | RTT samples from WireGuard stack and health pings per tunnel/transport. | `telemetry.ObserveTunnelLatency` from tunnel health checks. | `newt_tunnel_latency_seconds_bucket{transport="wireguard",le="0.05",tunnel_id="wgpub"} 4` | +| `newt_tunnel_reconnects_total` | Counter | Reconnect attempts bucketed by initiator (`client`/`server`) and reason enums. | `telemetry.IncReconnect` across websocket, WG, and utility flows. | `newt_tunnel_reconnects_total{initiator="client",reason="timeout",tunnel_id="wgpub"} 3` | +| `newt_connection_attempts_total` | Counter | Auth and WebSocket attempt counts by transport (`auth`, `websocket`) and result (`success`/`failure`). | `telemetry.IncConnAttempt` in auth/token and dial paths. | `newt_connection_attempts_total{transport="websocket",result="failure",site_id="self"} 2` | +| `newt_connection_errors_total` | Counter | Connection error tally keyed by transport and canonical error type (`dial_timeout`, `tls_handshake`, `auth_failed`, `io_error`). | `telemetry.IncConnError` in auth/websocket flows. | `newt_connection_errors_total{transport="auth",error_type="auth_failed",site_id="self"} 1` | +| `newt_config_reloads_total` | Counter | Successful/failed config reload attempts. | `telemetry.IncConfigReload` during WireGuard config reloads. | `newt_config_reloads_total{result="success",site_id="self"} 1` | +| `newt_restart_count_total` | Counter | Bumps to 1 at process boot for build info scrapers. | `telemetry.RegisterBuildInfo` called from `Init`. | `newt_restart_count_total{site_id="self"} 1` | +| `newt_config_apply_seconds` | Histogram (`s`) | Measures interface/peer apply duration per phase and result. | `telemetry.ObserveConfigApply` around config updates. | `newt_config_apply_seconds_bucket{phase="peer",result="success",le="0.1"} 5` | +| `newt_cert_rotation_total` | Counter | Certificate rotation events tagged by result. | `telemetry.IncCertRotation` during PKI updates. | `newt_cert_rotation_total{result="success",site_id="self"} 1` | +| `newt_build_info` | Observable gauge | Constant 1 with `version`/`commit` attributes to expose build metadata. | Callback registered in `registerBuildWSProxyInstruments`. | `newt_build_info{version="1.2.3",commit="abc123",site_id="self"} 1` | +| `newt_websocket_connect_latency_seconds` | Histogram (`s`) | Dial latency for Pangolin WebSocket connects annotated with result/error_type. | `telemetry.ObserveWSConnectLatency` inside `Client.establishConnection`. | `newt_websocket_connect_latency_seconds_bucket{result="success",transport="websocket",le="0.5"} 1` | +| `newt_websocket_messages_total` | Counter | Counts inbound/outbound WebSocket messages by direction and logical message type. | `telemetry.IncWSMessage` for ping/pong/text events. | `newt_websocket_messages_total{direction="out",msg_type="ping",site_id="self"} 4` | +| `newt_websocket_disconnects_total` | Counter | Tracks WebSocket disconnects grouped by `reason` (`shutdown`, `unexpected_close`, etc.) and `result`. | Emitted from `Client.readPumpWithDisconnectDetection` defer block. | `newt_websocket_disconnects_total{reason="unexpected_close",result="error",site_id="self"} 1` | +| `newt_websocket_keepalive_failures_total` | Counter | Failed WebSocket ping/pong keepalive attempts by reason. | Incremented in `Client.pingMonitor` when `WriteControl` fails. | `newt_websocket_keepalive_failures_total{reason="ping_write",site_id="self"} 1` | +| `newt_websocket_session_duration_seconds` | Histogram (`s`) | Duration of WebSocket sessions by outcome (`result`). | Observed when the read pump exits. | `newt_websocket_session_duration_seconds_sum{result="success",site_id="self"} 120` | +| `newt_proxy_active_connections` | Observable gauge | Active TCP/UDP proxy connections per tunnel and protocol. | Proxy manager callback via `SetProxyObservableCallback`. | `newt_proxy_active_connections{protocol="tcp",tunnel_id="wgpub"} 3` | +| `newt_proxy_buffer_bytes` | Observable gauge (`By`) | Size of proxy buffer pools (synchronous path) per tunnel/protocol. | Same proxy callback as above. | `newt_proxy_buffer_bytes{protocol="tcp",tunnel_id="wgpub"} 10240` | +| `newt_proxy_async_backlog_bytes` | Observable gauge (`By`) | Unflushed async byte backlog when deferred accounting is enabled. | Proxy callback when async accounting is turned on. | `newt_proxy_async_backlog_bytes{protocol="udp",tunnel_id="wgpub"} 4096` | +| `newt_proxy_drops_total` | Counter | Proxy write-drop events per protocol/tunnel. | `telemetry.IncProxyDrops` on UDP drop paths. | `newt_proxy_drops_total{protocol="udp",tunnel_id="wgpub"} 2` | +| `newt_proxy_accept_total` | Counter | Proxy accept attempts labelled by protocol, result, and reason. | `telemetry.IncProxyAccept` in TCP accept loop and UDP dial paths. | `newt_proxy_accept_total{protocol="tcp",result="failure",reason="timeout",site_id="self"} 1` | +| `newt_proxy_connection_duration_seconds` | Histogram (`s`) | Lifecycle duration for proxied TCP/UDP connections by result. | `telemetry.ObserveProxyConnectionDuration` when TCP/UDP handlers complete. | `newt_proxy_connection_duration_seconds_sum{protocol="udp",result="success",site_id="self"} 30` | + +In addition, Go runtime metrics are automatically exported when telemetry is initialised. 【F:internal/telemetry/telemetry.go†L147-L155】 + +## Tracing footprint +* Tracing is enabled only when OTLP export is turned on; `telemetry.Init` wires a batch `TracerProvider` and sets it globally. 【F:internal/telemetry/telemetry.go†L135-L155】 +* The admin HTTP mux (`/metrics`, `/healthz`) is wrapped with `otelhttp.NewHandler`, so any inbound admin requests produce spans. 【F:main.go†L373-L387】 +* WebSocket dials create a `ws.connect` span around the outbound handshake, but subsequent control-plane HTTP requests (token fetch, blueprint sync) use plain `http.Client` without propagation. 【F:websocket/client.go†L417-L459】 + +Overall span coverage is limited to the WebSocket connect loop and admin server; tunnel setup, Docker discovery, config application, and health pings currently emit only metrics. + +## Guideline & best-practice adherence +* **Resource & exporter configuration:** `telemetry.FromEnv` honours OTEL env-vars, sets service name/version, and promotes `site_id`/`region` resource attributes before building the provider. Exporters default to Prometheus with optional OTLP, aligning with OTel deployment guidance. 【F:internal/telemetry/telemetry.go†L56-L206】 +* **Low-cardinality enforcement:** A view-level attribute allow-list retains only approved keys (`tunnel_id`, `transport`, `protocol`, etc.), protecting Prometheus cardinality while still surfacing `site_id`/`region`. 【F:internal/telemetry/telemetry.go†L209-L231】 +* **Units and naming:** Instrument helpers enforce `_total` suffixes for counters, `_seconds` for durations, and attach `metric.WithUnit("By"|"s")` for size/time metrics, matching OTel semantic conventions. 【F:internal/telemetry/metrics.go†L23-L192】 +* **Runtime metrics & shutdown:** The runtime instrumentation is enabled, and `Setup.Shutdown` drains exporters in reverse order to avoid data loss. 【F:internal/telemetry/telemetry.go†L147-L261】 +* **Site-aware observables:** `state.TelemetryView` provides thread-safe snapshots to feed `newt_site_online`/`_last_heartbeat_seconds`/`_tunnel_sessions`, ensuring gauges report cohesive per-site data even when `tunnel_id` labels are disabled. 【F:internal/state/telemetry_view.go†L11-L79】 + +## Gaps & recommended improvements +1. **Tracing coverage:** Instrument the Pangolin REST calls (`getToken`, blueprint downloads) with `otelhttp.NewTransport` or explicit spans, and consider spans for WireGuard handshake/config apply to enable end-to-end traces when OTLP is on. 【F:websocket/client.go†L240-L360】 +2. **Histogram coverage:** Introduce `newt_site_registration_latency_seconds` (bootstrap) and `newt_ping_roundtrip_seconds` (heartbeat) to capture SLO-critical latencies before release. Existing latency buckets (`0.005s` → `30s`) can be reused. 【F:internal/telemetry/telemetry.go†L209-L218】 +3. **Control-plane throughput:** Add `newt_websocket_payload_bytes_total` (direction/msg_type) or reuse the tunnel counter with a `transport="websocket"` label to quantify command traffic volume and detect back-pressure. +4. **Docker discovery metrics:** If Docker auto-discovery is enabled, expose counters for container add/remove events and failures so operators can trace missing backends to discovery issues. + +## Pre-release metric backlog +Prior to GA, we recommend landing the following high-value instruments: +* **Bootstrap latency:** `newt_site_registration_latency_seconds` histogram emitted around the initial Pangolin registration HTTP call to detect slow control-plane responses. +* **Session duration:** `newt_websocket_session_duration_seconds` histogram recorded when a WebSocket closes (result + reason) to quantify stability. +* **Heartbeat lag:** `newt_ping_roundtrip_seconds` histogram from ping/pong monitors to capture tunnel health, complementing the heartbeat gauge. +* **Proxy accept errors:** `newt_proxy_accept_errors_total` counter keyed by protocol/reason to surface listener pressure distinct from data-plane drops. +* **Discovery events:** `newt_discovery_events_total` counter with `action` (`add`, `remove`, `error`) and `source` (`docker`, `file`) to audit service inventory churn. + +Implementing the above will round out visibility into control-plane responsiveness, connection stability, and discovery health while preserving the existing low-cardinality discipline. diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index 1d11927..5403e43 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -45,14 +45,19 @@ var ( mBuildInfo metric.Int64ObservableGauge // WebSocket - mWSConnectLatency metric.Float64Histogram - mWSMessages metric.Int64Counter + mWSConnectLatency metric.Float64Histogram + mWSMessages metric.Int64Counter + mWSDisconnects metric.Int64Counter + mWSKeepaliveFailure metric.Int64Counter + mWSSessionDuration metric.Float64Histogram // Proxy mProxyActiveConns metric.Int64ObservableGauge mProxyBufferBytes metric.Int64ObservableGauge mProxyAsyncBacklogByte metric.Int64ObservableGauge mProxyDropsTotal metric.Int64Counter + mProxyAcceptsTotal metric.Int64Counter + mProxyConnDuration metric.Float64Histogram buildVersion string buildCommit string @@ -179,6 +184,13 @@ func registerBuildWSProxyInstruments() error { metric.WithUnit("s")) mWSMessages, _ = meter.Int64Counter("newt_websocket_messages_total", metric.WithDescription("WebSocket messages by direction and type")) + mWSDisconnects, _ = meter.Int64Counter("newt_websocket_disconnects_total", + metric.WithDescription("WebSocket disconnects by reason/result")) + mWSKeepaliveFailure, _ = meter.Int64Counter("newt_websocket_keepalive_failures_total", + metric.WithDescription("WebSocket keepalive (ping/pong) failures")) + mWSSessionDuration, _ = meter.Float64Histogram("newt_websocket_session_duration_seconds", + metric.WithDescription("Duration of established WebSocket sessions"), + metric.WithUnit("s")) // Proxy mProxyActiveConns, _ = meter.Int64ObservableGauge("newt_proxy_active_connections", metric.WithDescription("Proxy active connections per tunnel and protocol")) @@ -190,6 +202,11 @@ func registerBuildWSProxyInstruments() error { metric.WithUnit("By")) mProxyDropsTotal, _ = meter.Int64Counter("newt_proxy_drops_total", metric.WithDescription("Proxy drops due to write errors")) + mProxyAcceptsTotal, _ = meter.Int64Counter("newt_proxy_accept_total", + metric.WithDescription("Proxy connection accepts by protocol and result")) + mProxyConnDuration, _ = meter.Float64Histogram("newt_proxy_connection_duration_seconds", + metric.WithDescription("Duration of completed proxy connections"), + metric.WithUnit("s")) // Register a default callback for build info if version/commit set reg, e := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { if buildVersion == "" && buildCommit == "" { @@ -328,6 +345,25 @@ func IncWSMessage(ctx context.Context, direction, msgType string) { )...)) } +func IncWSDisconnect(ctx context.Context, reason, result string) { + mWSDisconnects.Add(ctx, 1, metric.WithAttributes(attrsWithSite( + attribute.String("reason", reason), + attribute.String("result", result), + )...)) +} + +func IncWSKeepaliveFailure(ctx context.Context, reason string) { + mWSKeepaliveFailure.Add(ctx, 1, metric.WithAttributes(attrsWithSite( + attribute.String("reason", reason), + )...)) +} + +func ObserveWSSessionDuration(ctx context.Context, seconds float64, result string) { + mWSSessionDuration.Record(ctx, seconds, metric.WithAttributes(attrsWithSite( + attribute.String("result", result), + )...)) +} + // --- Proxy helpers --- func ObserveProxyActiveConnsObs(o metric.Observer, value int64, attrs []attribute.KeyValue) { @@ -352,6 +388,31 @@ func IncProxyDrops(ctx context.Context, tunnelID, protocol string) { mProxyDropsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite(attrs...)...)) } +func IncProxyAccept(ctx context.Context, tunnelID, protocol, result, reason string) { + attrs := []attribute.KeyValue{ + attribute.String("protocol", protocol), + attribute.String("result", result), + } + if reason != "" { + attrs = append(attrs, attribute.String("reason", reason)) + } + if ShouldIncludeTunnelID() && tunnelID != "" { + attrs = append(attrs, attribute.String("tunnel_id", tunnelID)) + } + mProxyAcceptsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite(attrs...)...)) +} + +func ObserveProxyConnectionDuration(ctx context.Context, tunnelID, protocol, result string, seconds float64) { + attrs := []attribute.KeyValue{ + attribute.String("protocol", protocol), + attribute.String("result", result), + } + if ShouldIncludeTunnelID() && tunnelID != "" { + attrs = append(attrs, attribute.String("tunnel_id", tunnelID)) + } + mProxyConnDuration.Record(ctx, seconds, metric.WithAttributes(attrsWithSite(attrs...)...)) +} + // --- Config/PKI helpers --- func ObserveConfigApply(ctx context.Context, phase, result string, seconds float64) { diff --git a/main.go b/main.go index 959b906..83f7524 100644 --- a/main.go +++ b/main.go @@ -586,6 +586,10 @@ func main() { // Register handlers for different message types client.RegisterHandler("newt/wg/connect", func(msg websocket.WSMessage) { logger.Info("Received registration message") + regResult := "success" + defer func() { + telemetry.IncSiteRegistration(ctx, regResult) + }() if stopFunc != nil { stopFunc() // stop the ws from sending more requests stopFunc = nil // reset stopFunc to nil to avoid double stopping @@ -605,11 +609,13 @@ func main() { jsonData, err := json.Marshal(msg.Data) if err != nil { logger.Info(fmtErrMarshaling, err) + regResult = "failure" return } if err := json.Unmarshal(jsonData, &wgData); err != nil { logger.Info("Error unmarshaling target data: %v", err) + regResult = "failure" return } @@ -620,6 +626,7 @@ func main() { mtuInt) if err != nil { logger.Error("Failed to create TUN device: %v", err) + regResult = "failure" } setDownstreamTNetstack(tnet) @@ -633,6 +640,7 @@ func main() { host, _, err := net.SplitHostPort(wgData.Endpoint) if err != nil { logger.Error("Failed to split endpoint: %v", err) + regResult = "failure" return } @@ -641,6 +649,7 @@ func main() { endpoint, err := resolveDomain(wgData.Endpoint) if err != nil { logger.Error("Failed to resolve endpoint: %v", err) + regResult = "failure" return } @@ -656,12 +665,14 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub err = dev.IpcSet(config) if err != nil { logger.Error("Failed to configure WireGuard device: %v", err) + regResult = "failure" } // Bring up the device err = dev.Up() if err != nil { logger.Error("Failed to bring up WireGuard device: %v", err) + regResult = "failure" } logger.Debug("WireGuard device created. Lets ping the server now...") @@ -676,10 +687,11 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub logger.Debug("Testing initial connection with reliable ping...") lat, err := reliablePing(tnet, wgData.ServerIP, pingTimeout, 5) if err == nil && wgData.PublicKey != "" { - telemetry.ObserveTunnelLatency(context.Background(), wgData.PublicKey, "wireguard", lat.Seconds()) + telemetry.ObserveTunnelLatency(ctx, wgData.PublicKey, "wireguard", lat.Seconds()) } if err != nil { logger.Warn("Initial reliable ping failed, but continuing: %v", err) + regResult = "failure" } else { logger.Info("Initial connection test successful") } @@ -701,9 +713,6 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub connected = true - // telemetry: record a successful site registration (omit region unless available) - telemetry.IncSiteRegistration(context.Background(), "success") - // add the targets if there are any if len(wgData.Targets.TCP) > 0 { updateTargets(pm, "add", wgData.TunnelIP, "tcp", TargetData{Targets: wgData.Targets.TCP}) @@ -738,7 +747,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub client.RegisterHandler("newt/wg/reconnect", func(msg websocket.WSMessage) { logger.Info("Received reconnect message") if wgData.PublicKey != "" { - telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server", telemetry.ReasonServerRequest) + telemetry.IncReconnect(ctx, wgData.PublicKey, "server", telemetry.ReasonServerRequest) } // Close the WireGuard device and TUN @@ -767,7 +776,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub client.RegisterHandler("newt/wg/terminate", func(msg websocket.WSMessage) { logger.Info("Received termination message") if wgData.PublicKey != "" { - telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server", telemetry.ReasonServerRequest) + telemetry.IncReconnect(ctx, wgData.PublicKey, "server", telemetry.ReasonServerRequest) } // Close the WireGuard device and TUN @@ -837,7 +846,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub }, } -stopFunc = client.SendMessageInterval(topicWGRegister, map[string]interface{}{ + stopFunc = client.SendMessageInterval(topicWGRegister, map[string]interface{}{ "publicKey": publicKey.String(), "pingResults": pingResults, "newtVersion": newtVersion, @@ -940,7 +949,7 @@ stopFunc = client.SendMessageInterval(topicWGRegister, map[string]interface{}{ } // Send the ping results to the cloud for selection -stopFunc = client.SendMessageInterval(topicWGRegister, map[string]interface{}{ + stopFunc = client.SendMessageInterval(topicWGRegister, map[string]interface{}{ "publicKey": publicKey.String(), "pingResults": pingResults, "newtVersion": newtVersion, diff --git a/proxy/manager.go b/proxy/manager.go index ceaa12b..31e7788 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -2,6 +2,7 @@ package proxy import ( "context" + "errors" "fmt" "io" "net" @@ -97,6 +98,32 @@ func (cw *countingWriter) Write(p []byte) (int, error) { return n, err } +func classifyProxyError(err error) string { + if err == nil { + return "" + } + if errors.Is(err, net.ErrClosed) { + return "closed" + } + if ne, ok := err.(net.Error); ok { + if ne.Timeout() { + return "timeout" + } + if ne.Temporary() { + return "temporary" + } + } + msg := strings.ToLower(err.Error()) + switch { + case strings.Contains(msg, "refused"): + return "refused" + case strings.Contains(msg, "reset"): + return "reset" + default: + return "io_error" + } +} + // NewProxyManager creates a new proxy manager instance func NewProxyManager(tnet *netstack.Net) *ProxyManager { return &ProxyManager{ @@ -467,72 +494,69 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) for { conn, err := listener.Accept() if err != nil { - // Check if we're shutting down or the listener was closed + telemetry.IncProxyAccept(context.Background(), pm.currentTunnelID, "tcp", "failure", classifyProxyError(err)) if !pm.running { return } - - // Check for specific network errors that indicate the listener is closed if ne, ok := err.(net.Error); ok && !ne.Temporary() { logger.Info("TCP listener closed, stopping proxy handler for %v", listener.Addr()) return } - logger.Error("Error accepting TCP connection: %v", err) - // Don't hammer the CPU if we hit a temporary error time.Sleep(100 * time.Millisecond) continue } - // Count sessions only once per accepted TCP connection - if pm.currentTunnelID != "" { - state.Global().IncSessions(pm.currentTunnelID) - if e := pm.getEntry(pm.currentTunnelID); e != nil { + tunnelID := pm.currentTunnelID + telemetry.IncProxyAccept(context.Background(), tunnelID, "tcp", "success", "") + if tunnelID != "" { + state.Global().IncSessions(tunnelID) + if e := pm.getEntry(tunnelID); e != nil { e.activeTCP.Add(1) } } - go func() { + go func(tunnelID string, accepted net.Conn) { + connStart := time.Now() target, err := net.Dial("tcp", targetAddr) if err != nil { logger.Error("Error connecting to target: %v", err) - conn.Close() + accepted.Close() + telemetry.IncProxyAccept(context.Background(), tunnelID, "tcp", "failure", classifyProxyError(err)) + telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "tcp", "failure", time.Since(connStart).Seconds()) return } - // already incremented on accept - - // Create a WaitGroup to ensure both copy operations complete + entry := pm.getEntry(tunnelID) + if entry == nil { + entry = &tunnelEntry{} + } var wg sync.WaitGroup wg.Add(2) - // client -> target (direction=in) - go func() { + go func(ent *tunnelEntry) { defer wg.Done() - e := pm.getEntry(pm.currentTunnelID) - cw := &countingWriter{ctx: context.Background(), w: target, set: e.attrInTCP, pm: pm, ent: e, out: false, proto: "tcp"} - _, _ = io.Copy(cw, conn) + cw := &countingWriter{ctx: context.Background(), w: target, set: ent.attrInTCP, pm: pm, ent: ent, out: false, proto: "tcp"} + _, _ = io.Copy(cw, accepted) _ = target.Close() - }() + }(entry) - // target -> client (direction=out) - go func() { + go func(ent *tunnelEntry) { defer wg.Done() - e := pm.getEntry(pm.currentTunnelID) - cw := &countingWriter{ctx: context.Background(), w: conn, set: e.attrOutTCP, pm: pm, ent: e, out: true, proto: "tcp"} + cw := &countingWriter{ctx: context.Background(), w: accepted, set: ent.attrOutTCP, pm: pm, ent: ent, out: true, proto: "tcp"} _, _ = io.Copy(cw, target) - _ = conn.Close() - }() + _ = accepted.Close() + }(entry) - // Wait for both copies to complete then session -1 wg.Wait() - if pm.currentTunnelID != "" { - state.Global().DecSessions(pm.currentTunnelID) - if e := pm.getEntry(pm.currentTunnelID); e != nil { + if tunnelID != "" { + state.Global().DecSessions(tunnelID) + if e := pm.getEntry(tunnelID); e != nil { e.activeTCP.Add(-1) } } - }() + telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "tcp", "success", time.Since(connStart).Seconds()) + }(tunnelID, conn) } } @@ -595,16 +619,20 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { targetUDPAddr, err := net.ResolveUDPAddr("udp", targetAddr) if err != nil { logger.Error("Error resolving target address: %v", err) + telemetry.IncProxyAccept(context.Background(), pm.currentTunnelID, "udp", "failure", "resolve") continue } targetConn, err = net.DialUDP("udp", nil, targetUDPAddr) if err != nil { logger.Error("Error connecting to target: %v", err) + telemetry.IncProxyAccept(context.Background(), pm.currentTunnelID, "udp", "failure", classifyProxyError(err)) continue } + tunnelID := pm.currentTunnelID + telemetry.IncProxyAccept(context.Background(), tunnelID, "udp", "success", "") // Only increment activeUDP after a successful DialUDP - if e := pm.getEntry(pm.currentTunnelID); e != nil { + if e := pm.getEntry(tunnelID); e != nil { e.activeUDP.Add(1) } @@ -612,18 +640,21 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { clientConns[clientKey] = targetConn clientsMutex.Unlock() - go func(clientKey string, targetConn *net.UDPConn, remoteAddr net.Addr) { + go func(clientKey string, targetConn *net.UDPConn, remoteAddr net.Addr, tunnelID string) { + start := time.Now() + result := "success" defer func() { // Always clean up when this goroutine exits clientsMutex.Lock() if storedConn, exists := clientConns[clientKey]; exists && storedConn == targetConn { delete(clientConns, clientKey) targetConn.Close() - if e := pm.getEntry(pm.currentTunnelID); e != nil { + if e := pm.getEntry(tunnelID); e != nil { e.activeUDP.Add(-1) } } clientsMutex.Unlock() + telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "udp", result, time.Since(start).Seconds()) }() buffer := make([]byte, 65507) @@ -631,6 +662,7 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { n, _, err := targetConn.ReadFromUDP(buffer) if err != nil { logger.Error("Error reading from target: %v", err) + result = "failure" return // defer will handle cleanup } @@ -651,10 +683,11 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { if err != nil { logger.Error("Error writing to client: %v", err) telemetry.IncProxyDrops(context.Background(), pm.currentTunnelID, "udp") + result = "failure" return // defer will handle cleanup } } - }(clientKey, targetConn, remoteAddr) + }(clientKey, targetConn, remoteAddr, tunnelID) } written, err := targetConn.Write(buffer[:n]) diff --git a/websocket/client.go b/websocket/client.go index ee6c2e6..ba39202 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "io" + "net" "net/http" "net/url" "os" @@ -42,6 +43,8 @@ type Client struct { writeMux sync.Mutex clientType string // Type of client (e.g., "newt", "olm") tlsConfig TLSConfig + metricsCtxMu sync.RWMutex + metricsCtx context.Context } type ClientOption func(*Client) @@ -85,6 +88,26 @@ func (c *Client) OnTokenUpdate(callback func(token string)) { c.onTokenUpdate = callback } +func (c *Client) metricsContext() context.Context { + c.metricsCtxMu.RLock() + defer c.metricsCtxMu.RUnlock() + if c.metricsCtx != nil { + return c.metricsCtx + } + return context.Background() +} + +func (c *Client) setMetricsContext(ctx context.Context) { + c.metricsCtxMu.Lock() + c.metricsCtx = ctx + c.metricsCtxMu.Unlock() +} + +// MetricsContext exposes the context used for telemetry emission when a connection is active. +func (c *Client) MetricsContext() context.Context { + return c.metricsContext() +} + // NewClient creates a new websocket client func NewClient(clientType string, ID, secret string, endpoint string, pingInterval time.Duration, pingTimeout time.Duration, opts ...ClientOption) (*Client, error) { config := &Config{ @@ -177,7 +200,7 @@ func (c *Client) SendMessage(messageType string, data interface{}) error { if err := c.conn.WriteJSON(msg); err != nil { return err } - telemetry.IncWSMessage(context.Background(), "out", "text") + telemetry.IncWSMessage(c.metricsContext(), "out", "text") return nil } @@ -273,8 +296,12 @@ func (c *Client) getToken() (string, error) { return "", fmt.Errorf("failed to marshal token request data: %w", err) } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + // Create a new request - req, err := http.NewRequest( + req, err := http.NewRequestWithContext( + ctx, "POST", baseEndpoint+"/api/v1/auth/"+c.clientType+"/get-token", bytes.NewBuffer(jsonData), @@ -296,7 +323,8 @@ func (c *Client) getToken() (string, error) { } resp, err := client.Do(req) if err != nil { - telemetry.IncConnError(context.Background(), "auth", classifyConnError(err)) + telemetry.IncConnAttempt(ctx, "auth", "failure") + telemetry.IncConnError(ctx, "auth", classifyConnError(err)) return "", fmt.Errorf("failed to request new token: %w", err) } defer resp.Body.Close() @@ -304,15 +332,15 @@ func (c *Client) getToken() (string, error) { if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) - telemetry.IncConnAttempt(context.Background(), "auth", "failure") + telemetry.IncConnAttempt(ctx, "auth", "failure") etype := "io_error" if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { etype = "auth_failed" } - telemetry.IncConnError(context.Background(), "auth", etype) + telemetry.IncConnError(ctx, "auth", etype) // Reconnect reason mapping for auth failures if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { - telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonAuthError) + telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonAuthError) } return "", fmt.Errorf("failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) } @@ -332,7 +360,7 @@ func (c *Client) getToken() (string, error) { } logger.Debug("Received token: %s", tokenResp.Data.Token) - telemetry.IncConnAttempt(context.Background(), "auth", "success") + telemetry.IncConnAttempt(ctx, "auth", "success") return tokenResp.Data.Token, nil } @@ -357,6 +385,30 @@ func classifyConnError(err error) string { } } +func classifyWSDisconnect(err error) (result, reason string) { + if err == nil { + return "success", "normal" + } + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { + return "success", "normal" + } + if ne, ok := err.(net.Error); ok && ne.Timeout() { + return "error", "timeout" + } + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + return "error", "unexpected_close" + } + msg := strings.ToLower(err.Error()) + switch { + case strings.Contains(msg, "eof"): + return "error", "eof" + case strings.Contains(msg, "reset"): + return "error", "connection_reset" + default: + return "error", "read_error" + } +} + func (c *Client) connectWithRetry() { for { select { @@ -375,13 +427,13 @@ func (c *Client) connectWithRetry() { } func (c *Client) establishConnection() error { + ctx := context.Background() + // Get token for authentication token, err := c.getToken() if err != nil { - // telemetry: connection attempt failed before dialing - // site_id isn't globally available here; use client ID as site_id (low cardinality) - telemetry.IncConnAttempt(context.Background(), "websocket", "failure") - telemetry.IncConnError(context.Background(), "websocket", classifyConnError(err)) + telemetry.IncConnAttempt(ctx, "websocket", "failure") + telemetry.IncConnError(ctx, "websocket", classifyConnError(err)) return fmt.Errorf("failed to get token: %w", err) } @@ -416,7 +468,7 @@ func (c *Client) establishConnection() error { // Connect to WebSocket (optional span) tr := otel.Tracer("newt") - spanCtx, span := tr.Start(context.Background(), "ws.connect") + ctx, span := tr.Start(ctx, "ws.connect") defer span.End() start := time.Now() @@ -441,38 +493,40 @@ func (c *Client) establishConnection() error { logger.Debug("WebSocket TLS certificate verification disabled via SKIP_TLS_VERIFY environment variable") } - conn, _, err := dialer.DialContext(spanCtx, u.String(), nil) + conn, _, err := dialer.DialContext(ctx, u.String(), nil) lat := time.Since(start).Seconds() if err != nil { - telemetry.IncConnAttempt(context.Background(), "websocket", "failure") + telemetry.IncConnAttempt(ctx, "websocket", "failure") etype := classifyConnError(err) - telemetry.IncConnError(context.Background(), "websocket", etype) - telemetry.ObserveWSConnectLatency(context.Background(), lat, "failure", etype) + telemetry.IncConnError(ctx, "websocket", etype) + telemetry.ObserveWSConnectLatency(ctx, lat, "failure", etype) // Map handshake-related errors to reconnect reasons where appropriate if etype == "tls_handshake" { - telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonHandshakeError) + telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonHandshakeError) } else if etype == "dial_timeout" { - telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonTimeout) + telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonTimeout) } else { - telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonError) + telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonError) } return fmt.Errorf("failed to connect to WebSocket: %w", err) } - telemetry.IncConnAttempt(context.Background(), "websocket", "success") - telemetry.ObserveWSConnectLatency(context.Background(), lat, "success", "") + telemetry.IncConnAttempt(ctx, "websocket", "success") + telemetry.ObserveWSConnectLatency(ctx, lat, "success", "") c.conn = conn c.setConnected(true) + c.setMetricsContext(ctx) + sessionStart := time.Now() // Wire up pong handler for metrics c.conn.SetPongHandler(func(appData string) error { - telemetry.IncWSMessage(context.Background(), "in", "pong") + telemetry.IncWSMessage(c.metricsContext(), "in", "pong") return nil }) // Start the ping monitor go c.pingMonitor() // Start the read pump with disconnect detection - go c.readPumpWithDisconnectDetection() + go c.readPumpWithDisconnectDetection(sessionStart) if c.onConnect != nil { err := c.saveConfig() @@ -566,7 +620,7 @@ func (c *Client) pingMonitor() { c.writeMux.Lock() err := c.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(c.pingTimeout)) if err == nil { - telemetry.IncWSMessage(context.Background(), "out", "ping") + telemetry.IncWSMessage(c.metricsContext(), "out", "ping") } c.writeMux.Unlock() if err != nil { @@ -577,6 +631,7 @@ func (c *Client) pingMonitor() { return default: logger.Error("Ping failed: %v", err) + telemetry.IncWSKeepaliveFailure(c.metricsContext(), "ping_write") c.reconnect() return } @@ -586,11 +641,19 @@ func (c *Client) pingMonitor() { } // readPumpWithDisconnectDetection reads messages and triggers reconnect on error -func (c *Client) readPumpWithDisconnectDetection() { +func (c *Client) readPumpWithDisconnectDetection(started time.Time) { + ctx := c.metricsContext() + disconnectReason := "shutdown" + disconnectResult := "success" + defer func() { if c.conn != nil { c.conn.Close() } + if !started.IsZero() { + telemetry.ObserveWSSessionDuration(ctx, time.Since(started).Seconds(), disconnectResult) + } + telemetry.IncWSDisconnect(ctx, disconnectReason, disconnectResult) // Only attempt reconnect if we're not shutting down select { case <-c.done: @@ -604,12 +667,14 @@ func (c *Client) readPumpWithDisconnectDetection() { for { select { case <-c.done: + disconnectReason = "shutdown" + disconnectResult = "success" return default: var msg WSMessage err := c.conn.ReadJSON(&msg) if err == nil { - telemetry.IncWSMessage(context.Background(), "in", "text") + telemetry.IncWSMessage(c.metricsContext(), "in", "text") } if err != nil { // Check if we're shutting down before logging error @@ -617,13 +682,18 @@ func (c *Client) readPumpWithDisconnectDetection() { case <-c.done: // Expected during shutdown, don't log as error logger.Debug("WebSocket connection closed during shutdown") + disconnectReason = "shutdown" + disconnectResult = "success" return default: // Unexpected error during normal operation - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { - logger.Error("WebSocket read error: %v", err) - } else { - logger.Debug("WebSocket connection closed: %v", err) + disconnectResult, disconnectReason = classifyWSDisconnect(err) + if disconnectResult == "error" { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { + logger.Error("WebSocket read error: %v", err) + } else { + logger.Debug("WebSocket connection closed: %v", err) + } } return // triggers reconnect via defer } diff --git a/wg/wg.go b/wg/wg.go index eccd64f..0ab1919 100644 --- a/wg/wg.go +++ b/wg/wg.go @@ -280,6 +280,15 @@ func (s *WireGuardService) LoadRemoteConfig() error { } func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { + ctx := context.Background() + if s.client != nil { + ctx = s.client.MetricsContext() + } + result := "success" + defer func() { + telemetry.IncConfigReload(ctx, result) + }() + var config WgConfig logger.Debug("Received message: %v", msg) @@ -288,11 +297,13 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { jsonData, err := json.Marshal(msg.Data) if err != nil { logger.Info("Error marshaling data: %v", err) + result = "failure" return } if err := json.Unmarshal(jsonData, &config); err != nil { logger.Info("Error unmarshaling target data: %v", err) + result = "failure" return } s.config = config @@ -303,27 +314,28 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { } // telemetry: config reload success - telemetry.IncConfigReload(context.Background(), "success") // Optional reconnect reason mapping: config change if s.serverPubKey != "" { - telemetry.IncReconnect(context.Background(), s.serverPubKey, "client", telemetry.ReasonConfigChange) + telemetry.IncReconnect(ctx, s.serverPubKey, "client", telemetry.ReasonConfigChange) } // Ensure the WireGuard interface and peers are configured start := time.Now() if err := s.ensureWireguardInterface(config); err != nil { logger.Error("Failed to ensure WireGuard interface: %v", err) - telemetry.ObserveConfigApply(context.Background(), "interface", "failure", time.Since(start).Seconds()) + telemetry.ObserveConfigApply(ctx, "interface", "failure", time.Since(start).Seconds()) + result = "failure" } else { - telemetry.ObserveConfigApply(context.Background(), "interface", "success", time.Since(start).Seconds()) + telemetry.ObserveConfigApply(ctx, "interface", "success", time.Since(start).Seconds()) } startPeers := time.Now() if err := s.ensureWireguardPeers(config.Peers); err != nil { logger.Error("Failed to ensure WireGuard peers: %v", err) - telemetry.ObserveConfigApply(context.Background(), "peer", "failure", time.Since(startPeers).Seconds()) + telemetry.ObserveConfigApply(ctx, "peer", "failure", time.Since(startPeers).Seconds()) + result = "failure" } else { - telemetry.ObserveConfigApply(context.Background(), "peer", "success", time.Since(startPeers).Seconds()) + telemetry.ObserveConfigApply(ctx, "peer", "success", time.Since(startPeers).Seconds()) } }