diff --git a/docs/METRICS_RECOMMENDATIONS.md b/docs/METRICS_RECOMMENDATIONS.md index c085e06..968b0a9 100644 --- a/docs/METRICS_RECOMMENDATIONS.md +++ b/docs/METRICS_RECOMMENDATIONS.md @@ -6,20 +6,20 @@ This document captures the current state of Newt metrics, prioritized fixes, and - Export: Prometheus exposition (default), optional OTLP (gRPC) - Existing instruments: - - Sites: newt_site_registrations_total, newt_site_online (0/1), newt_site_last_heartbeat_seconds + - Sites: newt_site_registrations_total, newt_site_online (0/1), newt_site_last_heartbeat_timestamp_seconds - Tunnel/Traffic: newt_tunnel_sessions, newt_tunnel_bytes_total, newt_tunnel_latency_seconds, newt_tunnel_reconnects_total - Connection lifecycle: newt_connection_attempts_total, newt_connection_errors_total - - Operations: newt_config_reloads_total, newt_restart_count_total, newt_build_info - - Operations: newt_config_reloads_total, newt_restart_count_total, newt_config_apply_seconds, newt_cert_rotation_total + - Operations: newt_config_reloads_total, process_start_time_seconds, newt_build_info + - Operations: newt_config_reloads_total, process_start_time_seconds, newt_config_apply_seconds, newt_cert_rotation_total - Build metadata: newt_build_info - - Control plane: newt_websocket_connect_latency_seconds, newt_websocket_messages_total - - Proxy: newt_proxy_active_connections, newt_proxy_buffer_bytes, newt_proxy_async_backlog_bytes, newt_proxy_drops_total + - Control plane: newt_websocket_connect_latency_seconds, newt_websocket_messages_total, newt_websocket_connected, newt_websocket_reconnects_total + - Proxy: newt_proxy_active_connections, newt_proxy_buffer_bytes, newt_proxy_async_backlog_bytes, newt_proxy_drops_total, newt_proxy_accept_total, newt_proxy_connection_duration_seconds, newt_proxy_connections_total - Go runtime: GC, heap, goroutines via runtime instrumentation 2) Main issues addressed now - Attribute filter (allow-list) extended to include site_id and region in addition to existing keys (tunnel_id, transport, protocol, direction, result, reason, error_type, version, commit). - - site_id and region propagation: site_id is now attached as a metric label across newt_*; region is added as a metric label when set. Both remain resource attributes for consistency with OTEL. + - site_id and region propagation: site_id/region remain resource attributes. Metric labels mirror them on per-site gauges and counters by default; set `NEWT_METRICS_INCLUDE_SITE_LABELS=false` to drop them for multi-tenant scrapes. - Label semantics clarified: - transport: control-plane mechanism (e.g., websocket, wireguard) - protocol: L4 payload type (tcp, udp) @@ -29,10 +29,9 @@ This document captures the current state of Newt metrics, prioritized fixes, and 3) Remaining gaps and deviations - Some call sites still need initiator label on reconnect outcomes (client vs server). This is planned. - - WebSocket and Proxy metrics (connect latency, messages, active connections, buffer/drops, async backlog) are planned additions. - Config apply duration and cert rotation counters are planned. - Registration and config reload failures are not yet emitted; add failure code paths so result labels expose churn. - - Restart counter increments only when build metadata is provided; consider decoupling to count all boots. + - Document using `process_start_time_seconds` (and `time()` in PromQL) to derive uptime; no explicit restart counter is needed. - Metric helpers often use `context.Background()`. Where lightweight contexts exist (e.g., HTTP handlers), propagate them to ease future correlation. - Tracing coverage is limited to admin HTTP and WebSocket connect spans; extend to blueprint fetches, proxy accept loops, and WireGuard updates when OTLP is enabled. @@ -44,8 +43,6 @@ This document captures the current state of Newt metrics, prioritized fixes, and - Correct label semantics (transport vs protocol); fix sessions transport labelling - Documentation alignment - Phase 2 (next) - - WebSocket: newt_websocket_connect_latency_seconds; newt_websocket_messages_total{direction,msg_type} - - Proxy: newt_proxy_active_connections, newt_proxy_buffer_bytes, newt_proxy_drops_total, newt_proxy_async_backlog_bytes - Reconnect: add initiator label (client/server) - Config & PKI: newt_config_apply_seconds{phase,result}; newt_cert_rotation_total{result} - WebSocket disconnect and keepalive failure counters @@ -66,7 +63,7 @@ This document captures the current state of Newt metrics, prioritized fixes, and - Sustained connection errors: - rate(newt_connection_errors_total[5m]) by (site_id,transport,error_type) - Heartbeat gaps: - - max_over_time(newt_site_last_heartbeat_seconds[15m]) by (site_id) + - max_over_time(time() - newt_site_last_heartbeat_timestamp_seconds[15m]) by (site_id) - Proxy drops: - increase(newt_proxy_drops_total[5m]) by (site_id,protocol) - WebSocket connect p95 (when added): diff --git a/docs/observability.md b/docs/observability.md index ba19aac..42f0cb1 100644 --- a/docs/observability.md +++ b/docs/observability.md @@ -27,6 +27,7 @@ Enable exporters via environment variables (no code changes required) - OTEL_RESOURCE_ATTRIBUTES=service.instance.id=,site_id= - OTEL_METRIC_EXPORT_INTERVAL=15s (default) - NEWT_ADMIN_ADDR=127.0.0.1:2112 (default admin HTTP with /metrics) +- NEWT_METRICS_INCLUDE_SITE_LABELS=true|false (default: true; disable to drop site_id/region as metric labels and rely on resource attributes only) Runtime behavior @@ -36,12 +37,14 @@ Runtime behavior Metric catalog (current) +Unless otherwise noted, `site_id` and `region` are available via resource attributes and, by default, as metric labels. Set `NEWT_METRICS_INCLUDE_SITE_LABELS=false` to drop them from counter/histogram label sets in high-cardinality environments. + | Metric | Instrument | Key attributes | Purpose | Example | | --- | --- | --- | --- | --- | -| `newt_build_info` | Observable gauge (Int64) | `version`, `commit`, `site_id`, `region` (optional) | Emits build metadata with value `1` for scrape-time verification. | `newt_build_info{version="1.5.0",site_id="acme-edge-1"} 1` | +| `newt_build_info` | Observable gauge (Int64) | `version`, `commit`, `site_id`, `region` (optional when site labels enabled) | Emits build metadata with value `1` for scrape-time verification. | `newt_build_info{version="1.5.0"} 1` | | `newt_site_registrations_total` | Counter (Int64) | `result` (`success`/`failure`), `site_id`, `region` (optional) | Counts Pangolin registration attempts. | `newt_site_registrations_total{result="success",site_id="acme-edge-1"} 1` | | `newt_site_online` | Observable gauge (Int64) | `site_id` | Reports whether the site is currently connected (`1`) or offline (`0`). | `newt_site_online{site_id="acme-edge-1"} 1` | -| `newt_site_last_heartbeat_seconds` | Observable gauge (Float64) | `site_id` | Time since the most recent Pangolin heartbeat. | `newt_site_last_heartbeat_seconds{site_id="acme-edge-1"} 2.4` | +| `newt_site_last_heartbeat_timestamp_seconds` | Observable gauge (Float64) | `site_id` | Unix timestamp of the most recent Pangolin heartbeat (derive age via `time() - metric`). | `newt_site_last_heartbeat_timestamp_seconds{site_id="acme-edge-1"} 1.728e+09` | | `newt_tunnel_sessions` | Observable gauge (Int64) | `site_id`, `tunnel_id` (when enabled) | Counts active tunnel sessions per peer; collapses to per-site when tunnel IDs are disabled. | `newt_tunnel_sessions{site_id="acme-edge-1",tunnel_id="wgpub..."} 3` | | `newt_tunnel_bytes_total` | Counter (Int64) | `direction` (`ingress`/`egress`), `protocol` (`tcp`/`udp`), `tunnel_id` (optional), `site_id`, `region` (optional) | Measures proxied traffic volume across tunnels. | `newt_tunnel_bytes_total{direction="ingress",protocol="tcp",site_id="acme-edge-1"} 4096` | | `newt_tunnel_latency_seconds` | Histogram (Float64) | `transport` (e.g., `wireguard`), `tunnel_id` (optional), `site_id`, `region` (optional) | Captures RTT or configuration-driven latency samples. | `newt_tunnel_latency_seconds_bucket{transport="wireguard",le="0.5"} 42` | @@ -49,15 +52,18 @@ Metric catalog (current) | `newt_connection_attempts_total` | Counter (Int64) | `transport` (`auth`/`websocket`), `result`, `site_id`, `region` (optional) | Measures control-plane dial attempts and their outcomes. | `newt_connection_attempts_total{transport="websocket",result="success",site_id="acme-edge-1"} 8` | | `newt_connection_errors_total` | Counter (Int64) | `transport`, `error_type`, `site_id`, `region` (optional) | Buckets connection failures by normalized error class. | `newt_connection_errors_total{transport="websocket",error_type="tls_handshake",site_id="acme-edge-1"} 1` | | `newt_config_reloads_total` | Counter (Int64) | `result`, `site_id`, `region` (optional) | Counts remote blueprint/config reloads. | `newt_config_reloads_total{result="success",site_id="acme-edge-1"} 3` | -| `newt_restart_count_total` | Counter (Int64) | `site_id`, `region` (optional) | Increments once per process boot to detect restarts. | `newt_restart_count_total{site_id="acme-edge-1"} 1` | +| `process_start_time_seconds` | Observable gauge (Float64) | — | Unix timestamp of the Newt process start time (use `time() - process_start_time_seconds` for uptime). | `process_start_time_seconds 1.728e+09` | | `newt_config_apply_seconds` | Histogram (Float64) | `phase` (`interface`/`peer`), `result`, `site_id`, `region` (optional) | Measures time spent applying WireGuard configuration phases. | `newt_config_apply_seconds_sum{phase="peer",result="success",site_id="acme-edge-1"} 0.48` | | `newt_cert_rotation_total` | Counter (Int64) | `result`, `site_id`, `region` (optional) | Tracks client certificate rotation attempts. | `newt_cert_rotation_total{result="success",site_id="acme-edge-1"} 2` | | `newt_websocket_connect_latency_seconds` | Histogram (Float64) | `transport="websocket"`, `result`, `error_type` (on failure), `site_id`, `region` (optional) | Measures WebSocket dial latency and exposes failure buckets. | `newt_websocket_connect_latency_seconds_bucket{result="success",le="0.5",site_id="acme-edge-1"} 9` | | `newt_websocket_messages_total` | Counter (Int64) | `direction` (`in`/`out`), `msg_type` (`text`/`ping`/`pong`), `site_id`, `region` (optional) | Accounts for control WebSocket traffic volume by type. | `newt_websocket_messages_total{direction="out",msg_type="ping",site_id="acme-edge-1"} 12` | +| `newt_websocket_connected` | Observable gauge (Int64) | `site_id`, `region` (optional) | Reports current WebSocket connectivity (`1` when connected). | `newt_websocket_connected{site_id="acme-edge-1"} 1` | +| `newt_websocket_reconnects_total` | Counter (Int64) | `reason` (`tls_handshake`, `dial_timeout`, `io_error`, `ping_write`, `timeout`, etc.), `site_id`, `region` (optional) | Counts reconnect attempts with normalized reasons for failure analysis. | `newt_websocket_reconnects_total{reason="timeout",site_id="acme-edge-1"} 3` | | `newt_proxy_active_connections` | Observable gauge (Int64) | `protocol` (`tcp`/`udp`), `direction` (`ingress`/`egress`), `tunnel_id` (optional), `site_id`, `region` (optional) | Current proxy connections per tunnel and protocol. | `newt_proxy_active_connections{protocol="tcp",direction="egress",site_id="acme-edge-1"} 4` | | `newt_proxy_buffer_bytes` | Observable gauge (Int64) | `protocol`, `direction`, `tunnel_id` (optional), `site_id`, `region` (optional) | Volume of buffered data awaiting flush in proxy queues. | `newt_proxy_buffer_bytes{protocol="udp",direction="egress",site_id="acme-edge-1"} 2048` | | `newt_proxy_async_backlog_bytes` | Observable gauge (Int64) | `protocol`, `direction`, `tunnel_id` (optional), `site_id`, `region` (optional) | Tracks async write backlog when deferred flushing is enabled. | `newt_proxy_async_backlog_bytes{protocol="tcp",direction="egress",site_id="acme-edge-1"} 512` | | `newt_proxy_drops_total` | Counter (Int64) | `protocol`, `tunnel_id` (optional), `site_id`, `region` (optional) | Counts proxy drop events caused by downstream write errors. | `newt_proxy_drops_total{protocol="udp",site_id="acme-edge-1"} 1` | +| `newt_proxy_connections_total` | Counter (Int64) | `event` (`opened`/`closed`), `protocol`, `tunnel_id` (optional), `site_id`, `region` (optional) | Tracks proxy connection lifecycle events for rate/SLO calculations. | `newt_proxy_connections_total{event="opened",protocol="tcp",site_id="acme-edge-1"} 10` | Conventions @@ -174,7 +180,7 @@ sum(newt_tunnel_sessions) Compatibility notes - Gauges do not use the _total suffix (e.g., newt_tunnel_sessions). -- site_id is emitted as both resource attribute and metric label on all newt_* series; region is included as a metric label only when set. tunnel_id is a metric label (WireGuard public key). Never expose secrets in labels. +- site_id/region remain resource attributes. Metric labels for these fields appear on per-site gauges (e.g., `newt_site_online`) and, by default, on counters/histograms; disable them with `NEWT_METRICS_INCLUDE_SITE_LABELS=false` if needed. `tunnel_id` is a metric label (WireGuard public key). Never expose secrets in labels. - NEWT_METRICS_INCLUDE_TUNNEL_ID (default: true) toggles whether tunnel_id is included as a label on bytes/sessions/proxy/reconnect metrics. Disable in high-cardinality environments. - Avoid double-scraping: scrape either Newt (/metrics) or the Collector's Prometheus exporter, not both. - Prometheus does not accept remote_write; use Mimir/Cortex/VM/Thanos-Receive for remote_write. diff --git a/docs/otel-review.md b/docs/otel-review.md index ba824e7..1d49d02 100644 --- a/docs/otel-review.md +++ b/docs/otel-review.md @@ -16,7 +16,7 @@ A global attribute filter (see `buildMeterProvider`) constrains exposed label ke exports stay bounded. - **Site lifecycle**: `newt_site_registrations_total`, `newt_site_online`, and - `newt_site_last_heartbeat_seconds` capture registration attempts and liveness. They + `newt_site_last_heartbeat_timestamp_seconds` capture registration attempts and liveness. They are fed either manually (`IncSiteRegistration`) or via the `TelemetryView` state callback that publishes observable gauges for the active site. - **Tunnel health and usage**: Counters and histograms track bytes, latency, reconnects, @@ -27,17 +27,20 @@ exports stay bounded. `newt_connection_errors_total` are emitted throughout the WebSocket client to classify authentication, dial, and transport failures. - **Operations/configuration**: `newt_config_reloads_total`, - `newt_restart_count_total`, `newt_config_apply_seconds`, and + `process_start_time_seconds`, `newt_config_apply_seconds`, and `newt_cert_rotation_total` provide visibility into blueprint reloads, process boots, configuration timings, and certificate rotation outcomes. - **Build metadata**: `newt_build_info` records the binary version/commit together - with a monotonic restart counter when build information is supplied at startup. -- **WebSocket control-plane**: `newt_websocket_connect_latency_seconds` and - `newt_websocket_messages_total` report connect latency and ping/pong/text activity. + with optional site metadata when build information is supplied at startup. +- **WebSocket control-plane**: `newt_websocket_connect_latency_seconds`, + `newt_websocket_messages_total`, `newt_websocket_connected`, and + `newt_websocket_reconnects_total` report connect latency, ping/pong/text activity, + connection state, and reconnect reasons. - **Proxy data-plane**: Observable gauges (`newt_proxy_active_connections`, - `newt_proxy_buffer_bytes`, `newt_proxy_async_backlog_bytes`) and the - `newt_proxy_drops_total` counter are fed from the proxy manager to monitor backlog - and drop behaviour alongside per-protocol byte counters. + `newt_proxy_buffer_bytes`, `newt_proxy_async_backlog_bytes`) plus counters for + drops, accepts, connection lifecycle events (`newt_proxy_connections_total`), and + duration histograms (`newt_proxy_connection_duration_seconds`) surface backlog, + drop behaviour, and churn alongside per-protocol byte counters. Refer to `docs/observability.md` for a tabular catalogue with instrument types, attributes, and sample exposition lines. @@ -61,8 +64,9 @@ The implementation adheres to most OTel Go recommendations: suffixes for counters and `_seconds`/`_bytes` unit conventions. Histograms are registered with explicit second-based buckets. - **Resource attributes** – Service name/version and optional `site_id`/`region` - populate the `resource.Resource` and are also injected as metric attributes for - compatibility with Prometheus queries. + populate the `resource.Resource`. Metric labels mirror these by default (and on + per-site gauges) but can be disabled with `NEWT_METRICS_INCLUDE_SITE_LABELS=false` + to avoid unnecessary cardinality growth. - **Attribute hygiene** – A single attribute filter (`sdkmetric.WithView`) enforces the allow-list of label keys to prevent accidental high-cardinality emission. - **Runtime metrics** – Go runtime instrumentation is enabled automatically through @@ -83,10 +87,9 @@ The review identified a few actionable adjustments: 2. **Surface config reload failures** – `telemetry.IncConfigReload` is invoked with `result="success"` only. Callers should record a failure result when blueprint parsing or application aborts before success counters are incremented. -3. **Harmonise restart count behaviour** – `newt_restart_count_total` increments only - when build metadata is provided. Consider moving the increment out of - `RegisterBuildInfo` so the counter advances even for ad-hoc builds without version - strings. +3. **Expose robust uptime** – Document using `time() - process_start_time_seconds` + to derive uptime now that the restart counter has been replaced with a timestamp + gauge. 4. **Propagate contexts where available** – Many emitters call metric helpers with `context.Background()`. Passing real contexts (when inexpensive) would allow future exporters to correlate spans and metrics. @@ -98,17 +101,17 @@ The review identified a few actionable adjustments: Prioritised additions that would close visibility gaps: -1. **WebSocket disconnect outcomes** – A counter (e.g., `newt_websocket_disconnects_total`) - partitioned by `reason` would complement the existing connect latency histogram and - explain reconnect storms. -2. **Keepalive/heartbeat failures** – Counting ping timeouts or heartbeat misses would - make `newt_site_last_heartbeat_seconds` actionable by providing discrete events. -3. **Proxy connection lifecycle** – Add counters/histograms for proxy accept events and - connection durations to correlate drops with load and backlog metrics. +1. **Config reload error taxonomy** – Split reload attempts into a dedicated + `newt_config_reload_errors_total{phase}` counter to make blueprint validation failures + visible alongside the existing success counter. +2. **Config source visibility** – Export `newt_config_source_info{source,version}` so + operators can audit the active blueprint origin/commit during incidents. +3. **Certificate expiry** – Emit `newt_cert_expiry_timestamp_seconds` (per cert) to + enable proactive alerts before mTLS credentials lapse. 4. **Blueprint/config pull latency** – Measuring Pangolin blueprint fetch durations and HTTP status distribution would expose slow control-plane operations. -5. **Certificate rotation attempts** – Complement `newt_cert_rotation_total` with a - duration histogram to observe slow PKI updates and detect stuck rotations. +5. **Tunnel setup latency** – Histograms for DNS resolution and tunnel handshakes would + help correlate connect latency spikes with network dependencies. These metrics rely on data that is already available in the code paths mentioned above and would round out operational dashboards. diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index 5403e43..6c34724 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -3,6 +3,8 @@ package telemetry import ( "context" "sync" + "sync/atomic" + "time" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -37,9 +39,9 @@ var ( // Config/Restart mConfigReloads metric.Int64Counter - mRestartCount metric.Int64Counter mConfigApply metric.Float64Histogram mCertRotationTotal metric.Int64Counter + mProcessStartTime metric.Float64ObservableGauge // Build info mBuildInfo metric.Int64ObservableGauge @@ -50,6 +52,8 @@ var ( mWSDisconnects metric.Int64Counter mWSKeepaliveFailure metric.Int64Counter mWSSessionDuration metric.Float64Histogram + mWSConnected metric.Int64ObservableGauge + mWSReconnects metric.Int64Counter // Proxy mProxyActiveConns metric.Int64ObservableGauge @@ -58,16 +62,28 @@ var ( mProxyDropsTotal metric.Int64Counter mProxyAcceptsTotal metric.Int64Counter mProxyConnDuration metric.Float64Histogram + mProxyConnectionsTotal metric.Int64Counter - buildVersion string - buildCommit string + buildVersion string + buildCommit string + processStartUnix = float64(time.Now().UnixNano()) / 1e9 + wsConnectedState atomic.Int64 ) -// attrsWithSite appends global site/region labels when present. +// Proxy connection lifecycle events. +const ( + ProxyConnectionOpened = "opened" + ProxyConnectionClosed = "closed" +) + +// attrsWithSite appends site/region labels only when explicitly enabled to keep +// label cardinality low by default. func attrsWithSite(extra ...attribute.KeyValue) []attribute.KeyValue { - attrs := make([]attribute.KeyValue, 0, len(extra)+2) - attrs = append(attrs, extra...) - attrs = append(attrs, siteAttrs()...) + attrs := make([]attribute.KeyValue, len(extra)) + copy(attrs, extra) + if ShouldIncludeSiteLabels() { + attrs = append(attrs, siteAttrs()...) + } return attrs } @@ -111,8 +127,9 @@ func registerSiteInstruments() error { if err != nil { return err } - mSiteLastHeartbeat, err = meter.Float64ObservableGauge("newt_site_last_heartbeat_seconds", - metric.WithDescription("Seconds since last site heartbeat")) + mSiteLastHeartbeat, err = meter.Float64ObservableGauge("newt_site_last_heartbeat_timestamp_seconds", + metric.WithDescription("Unix timestamp of the last site heartbeat"), + metric.WithUnit("s")) if err != nil { return err } @@ -164,13 +181,22 @@ func registerConnInstruments() error { func registerConfigInstruments() error { mConfigReloads, _ = meter.Int64Counter("newt_config_reloads_total", metric.WithDescription("Configuration reloads")) - mRestartCount, _ = meter.Int64Counter("newt_restart_count_total", - metric.WithDescription("Process restart count (incremented on start)")) mConfigApply, _ = meter.Float64Histogram("newt_config_apply_seconds", metric.WithDescription("Configuration apply duration in seconds"), metric.WithUnit("s")) mCertRotationTotal, _ = meter.Int64Counter("newt_cert_rotation_total", metric.WithDescription("Certificate rotation events (success/failure)")) + mProcessStartTime, _ = meter.Float64ObservableGauge("process_start_time_seconds", + metric.WithDescription("Unix timestamp of the process start time"), + metric.WithUnit("s")) + if mProcessStartTime != nil { + if _, err := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { + o.ObserveFloat64(mProcessStartTime, processStartUnix) + return nil + }, mProcessStartTime); err != nil { + otel.Handle(err) + } + } return nil } @@ -191,6 +217,10 @@ func registerBuildWSProxyInstruments() error { mWSSessionDuration, _ = meter.Float64Histogram("newt_websocket_session_duration_seconds", metric.WithDescription("Duration of established WebSocket sessions"), metric.WithUnit("s")) + mWSConnected, _ = meter.Int64ObservableGauge("newt_websocket_connected", + metric.WithDescription("WebSocket connection state (1=connected, 0=disconnected)")) + mWSReconnects, _ = meter.Int64Counter("newt_websocket_reconnects_total", + metric.WithDescription("WebSocket reconnect attempts by reason")) // Proxy mProxyActiveConns, _ = meter.Int64ObservableGauge("newt_proxy_active_connections", metric.WithDescription("Proxy active connections per tunnel and protocol")) @@ -207,6 +237,8 @@ func registerBuildWSProxyInstruments() error { mProxyConnDuration, _ = meter.Float64Histogram("newt_proxy_connection_duration_seconds", metric.WithDescription("Duration of completed proxy connections"), metric.WithUnit("s")) + mProxyConnectionsTotal, _ = meter.Int64Counter("newt_proxy_connections_total", + metric.WithDescription("Proxy connection lifecycle events by protocol")) // Register a default callback for build info if version/commit set reg, e := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { if buildVersion == "" && buildCommit == "" { @@ -219,7 +251,9 @@ func registerBuildWSProxyInstruments() error { if buildCommit != "" { attrs = append(attrs, attribute.String("commit", buildCommit)) } - attrs = append(attrs, siteAttrs()...) + if ShouldIncludeSiteLabels() { + attrs = append(attrs, siteAttrs()...) + } o.ObserveInt64(mBuildInfo, 1, metric.WithAttributes(attrs...)) return nil }, mBuildInfo) @@ -229,6 +263,17 @@ func registerBuildWSProxyInstruments() error { // Provide a functional stopper that unregisters the callback obsStopper = func() { _ = reg.Unregister() } } + if mWSConnected != nil { + if regConn, err := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { + val := wsConnectedState.Load() + o.ObserveInt64(mWSConnected, val, metric.WithAttributes(attrsWithSite()...)) + return nil + }, mWSConnected); err != nil { + otel.Handle(err) + } else { + wsConnStopper = func() { _ = regConn.Unregister() } + } + } return nil } @@ -237,10 +282,11 @@ func registerBuildWSProxyInstruments() error { // heartbeat seconds, and active sessions. var ( - obsOnce sync.Once - obsStopper func() - proxyObsOnce sync.Once - proxyStopper func() + obsOnce sync.Once + obsStopper func() + proxyObsOnce sync.Once + proxyStopper func() + wsConnStopper func() ) // SetObservableCallback registers a single callback that will be invoked @@ -251,7 +297,7 @@ var ( // // telemetry.SetObservableCallback(func(ctx context.Context, o metric.Observer) error { // o.ObserveInt64(mSiteOnline, 1) -// o.ObserveFloat64(mSiteLastHeartbeat, time.Since(lastHB).Seconds()) +// o.ObserveFloat64(mSiteLastHeartbeat, float64(lastHB.Unix())) // o.ObserveInt64(mTunnelSessions, int64(len(activeSessions))) // return nil // }) @@ -290,8 +336,6 @@ func SetProxyObservableCallback(cb func(context.Context, metric.Observer) error) func RegisterBuildInfo(version, commit string) { buildVersion = version buildCommit = commit - // Increment restart count on boot - mRestartCount.Add(context.Background(), 1) } // Config reloads @@ -358,6 +402,25 @@ func IncWSKeepaliveFailure(ctx context.Context, reason string) { )...)) } +// SetWSConnectionState updates the backing gauge for the WebSocket connected state. +func SetWSConnectionState(connected bool) { + if connected { + wsConnectedState.Store(1) + } else { + wsConnectedState.Store(0) + } +} + +// IncWSReconnect increments the WebSocket reconnect counter with a bounded reason label. +func IncWSReconnect(ctx context.Context, reason string) { + if reason == "" { + reason = "unknown" + } + mWSReconnects.Add(ctx, 1, metric.WithAttributes(attrsWithSite( + attribute.String("reason", reason), + )...)) +} + func ObserveWSSessionDuration(ctx context.Context, seconds float64, result string) { mWSSessionDuration.Record(ctx, seconds, metric.WithAttributes(attrsWithSite( attribute.String("result", result), @@ -413,6 +476,21 @@ func ObserveProxyConnectionDuration(ctx context.Context, tunnelID, protocol, res mProxyConnDuration.Record(ctx, seconds, metric.WithAttributes(attrsWithSite(attrs...)...)) } +// IncProxyConnectionEvent records proxy connection lifecycle events (opened/closed). +func IncProxyConnectionEvent(ctx context.Context, tunnelID, protocol, event string) { + if event == "" { + event = "unknown" + } + attrs := []attribute.KeyValue{ + attribute.String("protocol", protocol), + attribute.String("event", event), + } + if ShouldIncludeTunnelID() && tunnelID != "" { + attrs = append(attrs, attribute.String("tunnel_id", tunnelID)) + } + mProxyConnectionsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite(attrs...)...)) +} + // --- Config/PKI helpers --- func ObserveConfigApply(ctx context.Context, phase, result string, seconds float64) { diff --git a/internal/telemetry/metrics_test_helper.go b/internal/telemetry/metrics_test_helper.go new file mode 100644 index 0000000..16aa1a3 --- /dev/null +++ b/internal/telemetry/metrics_test_helper.go @@ -0,0 +1,59 @@ +package telemetry + +import ( + "sync" + "time" +) + +func resetMetricsForTest() { + initOnce = sync.Once{} + obsOnce = sync.Once{} + proxyObsOnce = sync.Once{} + obsStopper = nil + proxyStopper = nil + if wsConnStopper != nil { + wsConnStopper() + } + wsConnStopper = nil + meter = nil + + mSiteRegistrations = nil + mSiteOnline = nil + mSiteLastHeartbeat = nil + + mTunnelSessions = nil + mTunnelBytes = nil + mTunnelLatency = nil + mReconnects = nil + + mConnAttempts = nil + mConnErrors = nil + + mConfigReloads = nil + mConfigApply = nil + mCertRotationTotal = nil + mProcessStartTime = nil + + mBuildInfo = nil + + mWSConnectLatency = nil + mWSMessages = nil + mWSDisconnects = nil + mWSKeepaliveFailure = nil + mWSSessionDuration = nil + mWSConnected = nil + mWSReconnects = nil + + mProxyActiveConns = nil + mProxyBufferBytes = nil + mProxyAsyncBacklogByte = nil + mProxyDropsTotal = nil + mProxyAcceptsTotal = nil + mProxyConnDuration = nil + mProxyConnectionsTotal = nil + + processStartUnix = float64(time.Now().UnixNano()) / 1e9 + wsConnectedState.Store(0) + includeTunnelIDVal.Store(false) + includeSiteLabelVal.Store(false) +} diff --git a/internal/telemetry/state_view.go b/internal/telemetry/state_view.go index 071a405..6c6b6de 100644 --- a/internal/telemetry/state_view.go +++ b/internal/telemetry/state_view.go @@ -62,8 +62,8 @@ func observeSiteOnlineFor(o metric.Observer, sv StateView, siteID string) { func observeLastHeartbeatFor(o metric.Observer, sv StateView, siteID string) { if t, ok := sv.LastHeartbeat(siteID); ok { - secs := time.Since(t).Seconds() - o.ObserveFloat64(mSiteLastHeartbeat, secs, metric.WithAttributes( + ts := float64(t.UnixNano()) / 1e9 + o.ObserveFloat64(mSiteLastHeartbeat, ts, metric.WithAttributes( attribute.String("site_id", siteID), )) } diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 14100ec..bd435ce 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -118,6 +118,11 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) { } else { includeTunnelIDVal.Store(false) } + if getenv("NEWT_METRICS_INCLUDE_SITE_LABELS", "true") == "true" { + includeSiteLabelVal.Store(true) + } else { + includeSiteLabelVal.Store(false) + } res := buildResource(ctx, cfg) UpdateSiteInfo(cfg.SiteID, cfg.Region) @@ -294,7 +299,10 @@ func parseResourceAttributes(s string) map[string]string { // Global site/region used to enrich metric labels. var siteIDVal atomic.Value var regionVal atomic.Value -var includeTunnelIDVal atomic.Value // bool; default true +var ( + includeTunnelIDVal atomic.Value // bool; default true + includeSiteLabelVal atomic.Value // bool; default false +) // UpdateSiteInfo updates the global site_id and region used for metric labels. // Thread-safe via atomic.Value: subsequent metric emissions will include @@ -335,7 +343,12 @@ func siteAttrs() []attribute.KeyValue { } // SiteLabelKVs exposes site label KVs for other packages (e.g., proxy manager). -func SiteLabelKVs() []attribute.KeyValue { return siteAttrs() } +func SiteLabelKVs() []attribute.KeyValue { + if !ShouldIncludeSiteLabels() { + return nil + } + return siteAttrs() +} // ShouldIncludeTunnelID returns whether tunnel_id labels should be emitted. func ShouldIncludeTunnelID() bool { @@ -345,6 +358,15 @@ func ShouldIncludeTunnelID() bool { return true } +// ShouldIncludeSiteLabels returns whether site_id/region should be emitted as +// metric labels in addition to resource attributes. +func ShouldIncludeSiteLabels() bool { + if v, ok := includeSiteLabelVal.Load().(bool); ok { + return v + } + return false +} + func getenv(k, d string) string { if v := os.Getenv(k); v != "" { return v diff --git a/internal/telemetry/telemetry_attrfilter_test.go b/internal/telemetry/telemetry_attrfilter_test.go index 6c54afe..ebbb3c2 100644 --- a/internal/telemetry/telemetry_attrfilter_test.go +++ b/internal/telemetry/telemetry_attrfilter_test.go @@ -14,17 +14,23 @@ import ( // Test that disallowed attributes are filtered from the exposition. func TestAttributeFilterDropsUnknownKeys(t *testing.T) { - ctx := context.Background() -cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0"} + ctx := context.Background() + resetMetricsForTest() + t.Setenv("NEWT_METRICS_INCLUDE_SITE_LABELS", "true") + cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0"} tel, err := Init(ctx, cfg) - if err != nil { t.Fatalf("init: %v", err) } + if err != nil { + t.Fatalf("init: %v", err) + } defer func() { _ = tel.Shutdown(context.Background()) }() - if tel.PrometheusHandler == nil { t.Fatalf("prom handler nil") } + if tel.PrometheusHandler == nil { + t.Fatalf("prom handler nil") + } ts := httptest.NewServer(tel.PrometheusHandler) defer ts.Close() -// Add samples with disallowed attribute keys + // Add samples with disallowed attribute keys for _, k := range []string{"forbidden", "site_id", "host"} { set := attribute.NewSet(attribute.String(k, "x")) AddTunnelBytesSet(ctx, 123, set) @@ -32,7 +38,9 @@ cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0"} time.Sleep(50 * time.Millisecond) resp, err := http.Get(ts.URL) - if err != nil { t.Fatalf("GET: %v", err) } + if err != nil { + t.Fatalf("GET: %v", err) + } defer resp.Body.Close() b, _ := io.ReadAll(resp.Body) body := string(b) @@ -43,4 +51,3 @@ cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0"} t.Fatalf("expected allowed attribute site_id to be present in metrics, got: %s", body) } } - diff --git a/internal/telemetry/telemetry_golden_test.go b/internal/telemetry/telemetry_golden_test.go index 3e6f896..62f41b8 100644 --- a/internal/telemetry/telemetry_golden_test.go +++ b/internal/telemetry/telemetry_golden_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httptest" "os" + "path/filepath" "strings" "testing" "time" @@ -15,36 +16,61 @@ import ( // Golden test that /metrics contains expected metric names. func TestMetricsGoldenContains(t *testing.T) { ctx := context.Background() -cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0", BuildVersion: "test"} + resetMetricsForTest() + t.Setenv("NEWT_METRICS_INCLUDE_SITE_LABELS", "true") + cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0", BuildVersion: "test"} tel, err := Init(ctx, cfg) - if err != nil { t.Fatalf("telemetry init error: %v", err) } + if err != nil { + t.Fatalf("telemetry init error: %v", err) + } defer func() { _ = tel.Shutdown(context.Background()) }() - if tel.PrometheusHandler == nil { t.Fatalf("prom handler nil") } + if tel.PrometheusHandler == nil { + t.Fatalf("prom handler nil") + } ts := httptest.NewServer(tel.PrometheusHandler) defer ts.Close() - // Trigger a counter + // Trigger counters to ensure they appear in the scrape IncConnAttempt(ctx, "websocket", "success") + IncWSReconnect(ctx, "io_error") + IncProxyConnectionEvent(ctx, "", "tcp", ProxyConnectionOpened) + if tel.MeterProvider != nil { + _ = tel.MeterProvider.ForceFlush(ctx) + } time.Sleep(100 * time.Millisecond) - resp, err := http.Get(ts.URL) - if err != nil { t.Fatalf("GET metrics failed: %v", err) } - defer resp.Body.Close() - b, _ := io.ReadAll(resp.Body) - body := string(b) + var body string + for i := 0; i < 5; i++ { + resp, err := http.Get(ts.URL) + if err != nil { + t.Fatalf("GET metrics failed: %v", err) + } + b, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + body = string(b) + if strings.Contains(body, "newt_connection_attempts_total") { + break + } + time.Sleep(100 * time.Millisecond) + } - f, err := os.Open("internal/telemetry/testdata/expected_contains.golden") - if err != nil { t.Fatalf("read golden: %v", err) } + f, err := os.Open(filepath.Join("testdata", "expected_contains.golden")) + if err != nil { + t.Fatalf("read golden: %v", err) + } defer f.Close() s := bufio.NewScanner(f) for s.Scan() { needle := strings.TrimSpace(s.Text()) - if needle == "" { continue } + if needle == "" { + continue + } if !strings.Contains(body, needle) { t.Fatalf("expected metrics body to contain %q. body=\n%s", needle, body) } } - if err := s.Err(); err != nil { t.Fatalf("scan golden: %v", err) } + if err := s.Err(); err != nil { + t.Fatalf("scan golden: %v", err) + } } - diff --git a/internal/telemetry/telemetry_smoke_test.go b/internal/telemetry/telemetry_smoke_test.go index d51ea8e..b736ca5 100644 --- a/internal/telemetry/telemetry_smoke_test.go +++ b/internal/telemetry/telemetry_smoke_test.go @@ -13,13 +13,15 @@ import ( // Smoke test that /metrics contains at least one newt_* metric when Prom exporter is enabled. func TestMetricsSmoke(t *testing.T) { ctx := context.Background() + resetMetricsForTest() + t.Setenv("NEWT_METRICS_INCLUDE_SITE_LABELS", "true") cfg := Config{ - ServiceName: "newt", - PromEnabled: true, - OTLPEnabled: false, - AdminAddr: "127.0.0.1:0", - BuildVersion: "test", - BuildCommit: "deadbeef", + ServiceName: "newt", + PromEnabled: true, + OTLPEnabled: false, + AdminAddr: "127.0.0.1:0", + BuildVersion: "test", + BuildCommit: "deadbeef", MetricExportInterval: 5 * time.Second, } tel, err := Init(ctx, cfg) @@ -37,18 +39,27 @@ func TestMetricsSmoke(t *testing.T) { // Record a simple metric and then fetch /metrics IncConnAttempt(ctx, "websocket", "success") + if tel.MeterProvider != nil { + _ = tel.MeterProvider.ForceFlush(ctx) + } // Give the exporter a tick to collect time.Sleep(100 * time.Millisecond) - resp, err := http.Get(ts.URL) - if err != nil { - t.Fatalf("GET /metrics failed: %v", err) + var body string + for i := 0; i < 5; i++ { + resp, err := http.Get(ts.URL) + if err != nil { + t.Fatalf("GET /metrics failed: %v", err) + } + b, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + body = string(b) + if strings.Contains(body, "newt_connection_attempts_total") { + break + } + time.Sleep(100 * time.Millisecond) } - defer resp.Body.Close() - b, _ := io.ReadAll(resp.Body) - body := string(b) if !strings.Contains(body, "newt_connection_attempts_total") { t.Fatalf("expected newt_connection_attempts_total in metrics, got:\n%s", body) } } - diff --git a/internal/telemetry/testdata/expected_contains.golden b/internal/telemetry/testdata/expected_contains.golden index 48123dd..50d3892 100644 --- a/internal/telemetry/testdata/expected_contains.golden +++ b/internal/telemetry/testdata/expected_contains.golden @@ -1,3 +1,7 @@ newt_connection_attempts_total +newt_websocket_connected +newt_websocket_reconnects_total +newt_proxy_connections_total newt_build_info +process_start_time_seconds diff --git a/proxy/manager.go b/proxy/manager.go index 31e7788..cef5fa6 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -509,6 +509,7 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) tunnelID := pm.currentTunnelID telemetry.IncProxyAccept(context.Background(), tunnelID, "tcp", "success", "") + telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "tcp", telemetry.ProxyConnectionOpened) if tunnelID != "" { state.Global().IncSessions(tunnelID) if e := pm.getEntry(tunnelID); e != nil { @@ -523,6 +524,7 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) logger.Error("Error connecting to target: %v", err) accepted.Close() telemetry.IncProxyAccept(context.Background(), tunnelID, "tcp", "failure", classifyProxyError(err)) + telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "tcp", telemetry.ProxyConnectionClosed) telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "tcp", "failure", time.Since(connStart).Seconds()) return } @@ -556,6 +558,7 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) } } telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "tcp", "success", time.Since(connStart).Seconds()) + telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "tcp", telemetry.ProxyConnectionClosed) }(tunnelID, conn) } } @@ -631,6 +634,7 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { } tunnelID := pm.currentTunnelID telemetry.IncProxyAccept(context.Background(), tunnelID, "udp", "success", "") + telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionOpened) // Only increment activeUDP after a successful DialUDP if e := pm.getEntry(tunnelID); e != nil { e.activeUDP.Add(1) @@ -655,6 +659,7 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { } clientsMutex.Unlock() telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "udp", result, time.Since(start).Seconds()) + telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionClosed) }() buffer := make([]byte, 65507) diff --git a/scripts/smoke-metrics.sh b/scripts/smoke-metrics.sh index d2eb11f..27dd02e 100644 --- a/scripts/smoke-metrics.sh +++ b/scripts/smoke-metrics.sh @@ -20,7 +20,7 @@ probe "newt_* presence" "^newt_" || true # Site gauges with site_id probe "site_online with site_id" "^newt_site_online\{.*site_id=\"[^\"]+\"" || true -probe "last_heartbeat with site_id" "^newt_site_last_heartbeat_seconds\{.*site_id=\"[^\"]+\"" || true +probe "last_heartbeat with site_id" "^newt_site_last_heartbeat_timestamp_seconds\{.*site_id=\"[^\"]+\"" || true # Bytes with direction ingress/egress and protocol probe "tunnel bytes ingress" "^newt_tunnel_bytes_total\{.*direction=\"ingress\".*protocol=\"(tcp|udp)\"" || true @@ -39,11 +39,14 @@ fi # WebSocket metrics (when OTLP/WS used) probe "websocket connect latency buckets" "^newt_websocket_connect_latency_seconds_bucket" || true probe "websocket messages total" "^newt_websocket_messages_total\{.*(direction|msg_type)=" || true +probe "websocket connected gauge" "^newt_websocket_connected" || true +probe "websocket reconnects total" "^newt_websocket_reconnects_total\{" || true # Proxy metrics (when proxy active) probe "proxy active connections" "^newt_proxy_active_connections\{" || true probe "proxy buffer bytes" "^newt_proxy_buffer_bytes\{" || true probe "proxy drops total" "^newt_proxy_drops_total\{" || true +probe "proxy connections total" "^newt_proxy_connections_total\{" || true # Config apply probe "config apply seconds buckets" "^newt_config_apply_seconds_bucket\{" || true diff --git a/websocket/client.go b/websocket/client.go index ba39202..8af3be9 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -167,6 +167,7 @@ func (c *Client) Close() error { // Set connection status to false c.setConnected(false) + telemetry.SetWSConnectionState(false) // Close the WebSocket connection gracefully if c.conn != nil { @@ -508,6 +509,7 @@ func (c *Client) establishConnection() error { } else { telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonError) } + telemetry.IncWSReconnect(ctx, etype) return fmt.Errorf("failed to connect to WebSocket: %w", err) } @@ -515,6 +517,7 @@ func (c *Client) establishConnection() error { telemetry.ObserveWSConnectLatency(ctx, lat, "success", "") c.conn = conn c.setConnected(true) + telemetry.SetWSConnectionState(true) c.setMetricsContext(ctx) sessionStart := time.Now() // Wire up pong handler for metrics @@ -632,6 +635,7 @@ func (c *Client) pingMonitor() { default: logger.Error("Ping failed: %v", err) telemetry.IncWSKeepaliveFailure(c.metricsContext(), "ping_write") + telemetry.IncWSReconnect(c.metricsContext(), "ping_write") c.reconnect() return } @@ -660,6 +664,7 @@ func (c *Client) readPumpWithDisconnectDetection(started time.Time) { // Shutting down, don't reconnect return default: + telemetry.IncWSReconnect(ctx, disconnectReason) c.reconnect() } }() @@ -710,6 +715,7 @@ func (c *Client) readPumpWithDisconnectDetection(started time.Time) { func (c *Client) reconnect() { c.setConnected(false) + telemetry.SetWSConnectionState(false) if c.conn != nil { c.conn.Close() c.conn = nil