mirror of
https://github.com/fosrl/newt.git
synced 2026-03-26 20:46:41 +00:00
Merge branch 'main' of https://github.com/marcschaeferger/newt
This commit is contained in:
@@ -6,20 +6,20 @@ This document captures the current state of Newt metrics, prioritized fixes, and
|
|||||||
|
|
||||||
- Export: Prometheus exposition (default), optional OTLP (gRPC)
|
- Export: Prometheus exposition (default), optional OTLP (gRPC)
|
||||||
- Existing instruments:
|
- Existing instruments:
|
||||||
- Sites: newt_site_registrations_total, newt_site_online (0/1), newt_site_last_heartbeat_seconds
|
- Sites: newt_site_registrations_total, newt_site_online (0/1), newt_site_last_heartbeat_timestamp_seconds
|
||||||
- Tunnel/Traffic: newt_tunnel_sessions, newt_tunnel_bytes_total, newt_tunnel_latency_seconds, newt_tunnel_reconnects_total
|
- Tunnel/Traffic: newt_tunnel_sessions, newt_tunnel_bytes_total, newt_tunnel_latency_seconds, newt_tunnel_reconnects_total
|
||||||
- Connection lifecycle: newt_connection_attempts_total, newt_connection_errors_total
|
- Connection lifecycle: newt_connection_attempts_total, newt_connection_errors_total
|
||||||
- Operations: newt_config_reloads_total, newt_restart_count_total, newt_build_info
|
- Operations: newt_config_reloads_total, process_start_time_seconds, newt_build_info
|
||||||
- Operations: newt_config_reloads_total, newt_restart_count_total, newt_config_apply_seconds, newt_cert_rotation_total
|
- Operations: newt_config_reloads_total, process_start_time_seconds, newt_config_apply_seconds, newt_cert_rotation_total
|
||||||
- Build metadata: newt_build_info
|
- Build metadata: newt_build_info
|
||||||
- Control plane: newt_websocket_connect_latency_seconds, newt_websocket_messages_total
|
- Control plane: newt_websocket_connect_latency_seconds, newt_websocket_messages_total, newt_websocket_connected, newt_websocket_reconnects_total
|
||||||
- Proxy: newt_proxy_active_connections, newt_proxy_buffer_bytes, newt_proxy_async_backlog_bytes, newt_proxy_drops_total
|
- Proxy: newt_proxy_active_connections, newt_proxy_buffer_bytes, newt_proxy_async_backlog_bytes, newt_proxy_drops_total, newt_proxy_accept_total, newt_proxy_connection_duration_seconds, newt_proxy_connections_total
|
||||||
- Go runtime: GC, heap, goroutines via runtime instrumentation
|
- Go runtime: GC, heap, goroutines via runtime instrumentation
|
||||||
|
|
||||||
2) Main issues addressed now
|
2) Main issues addressed now
|
||||||
|
|
||||||
- Attribute filter (allow-list) extended to include site_id and region in addition to existing keys (tunnel_id, transport, protocol, direction, result, reason, error_type, version, commit).
|
- Attribute filter (allow-list) extended to include site_id and region in addition to existing keys (tunnel_id, transport, protocol, direction, result, reason, error_type, version, commit).
|
||||||
- site_id and region propagation: site_id is now attached as a metric label across newt_*; region is added as a metric label when set. Both remain resource attributes for consistency with OTEL.
|
- site_id and region propagation: site_id/region remain resource attributes. Metric labels mirror them on per-site gauges and counters by default; set `NEWT_METRICS_INCLUDE_SITE_LABELS=false` to drop them for multi-tenant scrapes.
|
||||||
- Label semantics clarified:
|
- Label semantics clarified:
|
||||||
- transport: control-plane mechanism (e.g., websocket, wireguard)
|
- transport: control-plane mechanism (e.g., websocket, wireguard)
|
||||||
- protocol: L4 payload type (tcp, udp)
|
- protocol: L4 payload type (tcp, udp)
|
||||||
@@ -29,10 +29,9 @@ This document captures the current state of Newt metrics, prioritized fixes, and
|
|||||||
3) Remaining gaps and deviations
|
3) Remaining gaps and deviations
|
||||||
|
|
||||||
- Some call sites still need initiator label on reconnect outcomes (client vs server). This is planned.
|
- Some call sites still need initiator label on reconnect outcomes (client vs server). This is planned.
|
||||||
- WebSocket and Proxy metrics (connect latency, messages, active connections, buffer/drops, async backlog) are planned additions.
|
|
||||||
- Config apply duration and cert rotation counters are planned.
|
- Config apply duration and cert rotation counters are planned.
|
||||||
- Registration and config reload failures are not yet emitted; add failure code paths so result labels expose churn.
|
- Registration and config reload failures are not yet emitted; add failure code paths so result labels expose churn.
|
||||||
- Restart counter increments only when build metadata is provided; consider decoupling to count all boots.
|
- Document using `process_start_time_seconds` (and `time()` in PromQL) to derive uptime; no explicit restart counter is needed.
|
||||||
- Metric helpers often use `context.Background()`. Where lightweight contexts exist (e.g., HTTP handlers), propagate them to ease future correlation.
|
- Metric helpers often use `context.Background()`. Where lightweight contexts exist (e.g., HTTP handlers), propagate them to ease future correlation.
|
||||||
- Tracing coverage is limited to admin HTTP and WebSocket connect spans; extend to blueprint fetches, proxy accept loops, and WireGuard updates when OTLP is enabled.
|
- Tracing coverage is limited to admin HTTP and WebSocket connect spans; extend to blueprint fetches, proxy accept loops, and WireGuard updates when OTLP is enabled.
|
||||||
|
|
||||||
@@ -44,8 +43,6 @@ This document captures the current state of Newt metrics, prioritized fixes, and
|
|||||||
- Correct label semantics (transport vs protocol); fix sessions transport labelling
|
- Correct label semantics (transport vs protocol); fix sessions transport labelling
|
||||||
- Documentation alignment
|
- Documentation alignment
|
||||||
- Phase 2 (next)
|
- Phase 2 (next)
|
||||||
- WebSocket: newt_websocket_connect_latency_seconds; newt_websocket_messages_total{direction,msg_type}
|
|
||||||
- Proxy: newt_proxy_active_connections, newt_proxy_buffer_bytes, newt_proxy_drops_total, newt_proxy_async_backlog_bytes
|
|
||||||
- Reconnect: add initiator label (client/server)
|
- Reconnect: add initiator label (client/server)
|
||||||
- Config & PKI: newt_config_apply_seconds{phase,result}; newt_cert_rotation_total{result}
|
- Config & PKI: newt_config_apply_seconds{phase,result}; newt_cert_rotation_total{result}
|
||||||
- WebSocket disconnect and keepalive failure counters
|
- WebSocket disconnect and keepalive failure counters
|
||||||
@@ -66,7 +63,7 @@ This document captures the current state of Newt metrics, prioritized fixes, and
|
|||||||
- Sustained connection errors:
|
- Sustained connection errors:
|
||||||
- rate(newt_connection_errors_total[5m]) by (site_id,transport,error_type)
|
- rate(newt_connection_errors_total[5m]) by (site_id,transport,error_type)
|
||||||
- Heartbeat gaps:
|
- Heartbeat gaps:
|
||||||
- max_over_time(newt_site_last_heartbeat_seconds[15m]) by (site_id)
|
- max_over_time(time() - newt_site_last_heartbeat_timestamp_seconds[15m]) by (site_id)
|
||||||
- Proxy drops:
|
- Proxy drops:
|
||||||
- increase(newt_proxy_drops_total[5m]) by (site_id,protocol)
|
- increase(newt_proxy_drops_total[5m]) by (site_id,protocol)
|
||||||
- WebSocket connect p95 (when added):
|
- WebSocket connect p95 (when added):
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ Enable exporters via environment variables (no code changes required)
|
|||||||
- OTEL_RESOURCE_ATTRIBUTES=service.instance.id=<id>,site_id=<id>
|
- OTEL_RESOURCE_ATTRIBUTES=service.instance.id=<id>,site_id=<id>
|
||||||
- OTEL_METRIC_EXPORT_INTERVAL=15s (default)
|
- OTEL_METRIC_EXPORT_INTERVAL=15s (default)
|
||||||
- NEWT_ADMIN_ADDR=127.0.0.1:2112 (default admin HTTP with /metrics)
|
- NEWT_ADMIN_ADDR=127.0.0.1:2112 (default admin HTTP with /metrics)
|
||||||
|
- NEWT_METRICS_INCLUDE_SITE_LABELS=true|false (default: true; disable to drop site_id/region as metric labels and rely on resource attributes only)
|
||||||
|
|
||||||
Runtime behavior
|
Runtime behavior
|
||||||
|
|
||||||
@@ -36,12 +37,14 @@ Runtime behavior
|
|||||||
|
|
||||||
Metric catalog (current)
|
Metric catalog (current)
|
||||||
|
|
||||||
|
Unless otherwise noted, `site_id` and `region` are available via resource attributes and, by default, as metric labels. Set `NEWT_METRICS_INCLUDE_SITE_LABELS=false` to drop them from counter/histogram label sets in high-cardinality environments.
|
||||||
|
|
||||||
| Metric | Instrument | Key attributes | Purpose | Example |
|
| Metric | Instrument | Key attributes | Purpose | Example |
|
||||||
| --- | --- | --- | --- | --- |
|
| --- | --- | --- | --- | --- |
|
||||||
| `newt_build_info` | Observable gauge (Int64) | `version`, `commit`, `site_id`, `region` (optional) | Emits build metadata with value `1` for scrape-time verification. | `newt_build_info{version="1.5.0",site_id="acme-edge-1"} 1` |
|
| `newt_build_info` | Observable gauge (Int64) | `version`, `commit`, `site_id`, `region` (optional when site labels enabled) | Emits build metadata with value `1` for scrape-time verification. | `newt_build_info{version="1.5.0"} 1` |
|
||||||
| `newt_site_registrations_total` | Counter (Int64) | `result` (`success`/`failure`), `site_id`, `region` (optional) | Counts Pangolin registration attempts. | `newt_site_registrations_total{result="success",site_id="acme-edge-1"} 1` |
|
| `newt_site_registrations_total` | Counter (Int64) | `result` (`success`/`failure`), `site_id`, `region` (optional) | Counts Pangolin registration attempts. | `newt_site_registrations_total{result="success",site_id="acme-edge-1"} 1` |
|
||||||
| `newt_site_online` | Observable gauge (Int64) | `site_id` | Reports whether the site is currently connected (`1`) or offline (`0`). | `newt_site_online{site_id="acme-edge-1"} 1` |
|
| `newt_site_online` | Observable gauge (Int64) | `site_id` | Reports whether the site is currently connected (`1`) or offline (`0`). | `newt_site_online{site_id="acme-edge-1"} 1` |
|
||||||
| `newt_site_last_heartbeat_seconds` | Observable gauge (Float64) | `site_id` | Time since the most recent Pangolin heartbeat. | `newt_site_last_heartbeat_seconds{site_id="acme-edge-1"} 2.4` |
|
| `newt_site_last_heartbeat_timestamp_seconds` | Observable gauge (Float64) | `site_id` | Unix timestamp of the most recent Pangolin heartbeat (derive age via `time() - metric`). | `newt_site_last_heartbeat_timestamp_seconds{site_id="acme-edge-1"} 1.728e+09` |
|
||||||
| `newt_tunnel_sessions` | Observable gauge (Int64) | `site_id`, `tunnel_id` (when enabled) | Counts active tunnel sessions per peer; collapses to per-site when tunnel IDs are disabled. | `newt_tunnel_sessions{site_id="acme-edge-1",tunnel_id="wgpub..."} 3` |
|
| `newt_tunnel_sessions` | Observable gauge (Int64) | `site_id`, `tunnel_id` (when enabled) | Counts active tunnel sessions per peer; collapses to per-site when tunnel IDs are disabled. | `newt_tunnel_sessions{site_id="acme-edge-1",tunnel_id="wgpub..."} 3` |
|
||||||
| `newt_tunnel_bytes_total` | Counter (Int64) | `direction` (`ingress`/`egress`), `protocol` (`tcp`/`udp`), `tunnel_id` (optional), `site_id`, `region` (optional) | Measures proxied traffic volume across tunnels. | `newt_tunnel_bytes_total{direction="ingress",protocol="tcp",site_id="acme-edge-1"} 4096` |
|
| `newt_tunnel_bytes_total` | Counter (Int64) | `direction` (`ingress`/`egress`), `protocol` (`tcp`/`udp`), `tunnel_id` (optional), `site_id`, `region` (optional) | Measures proxied traffic volume across tunnels. | `newt_tunnel_bytes_total{direction="ingress",protocol="tcp",site_id="acme-edge-1"} 4096` |
|
||||||
| `newt_tunnel_latency_seconds` | Histogram (Float64) | `transport` (e.g., `wireguard`), `tunnel_id` (optional), `site_id`, `region` (optional) | Captures RTT or configuration-driven latency samples. | `newt_tunnel_latency_seconds_bucket{transport="wireguard",le="0.5"} 42` |
|
| `newt_tunnel_latency_seconds` | Histogram (Float64) | `transport` (e.g., `wireguard`), `tunnel_id` (optional), `site_id`, `region` (optional) | Captures RTT or configuration-driven latency samples. | `newt_tunnel_latency_seconds_bucket{transport="wireguard",le="0.5"} 42` |
|
||||||
@@ -49,15 +52,18 @@ Metric catalog (current)
|
|||||||
| `newt_connection_attempts_total` | Counter (Int64) | `transport` (`auth`/`websocket`), `result`, `site_id`, `region` (optional) | Measures control-plane dial attempts and their outcomes. | `newt_connection_attempts_total{transport="websocket",result="success",site_id="acme-edge-1"} 8` |
|
| `newt_connection_attempts_total` | Counter (Int64) | `transport` (`auth`/`websocket`), `result`, `site_id`, `region` (optional) | Measures control-plane dial attempts and their outcomes. | `newt_connection_attempts_total{transport="websocket",result="success",site_id="acme-edge-1"} 8` |
|
||||||
| `newt_connection_errors_total` | Counter (Int64) | `transport`, `error_type`, `site_id`, `region` (optional) | Buckets connection failures by normalized error class. | `newt_connection_errors_total{transport="websocket",error_type="tls_handshake",site_id="acme-edge-1"} 1` |
|
| `newt_connection_errors_total` | Counter (Int64) | `transport`, `error_type`, `site_id`, `region` (optional) | Buckets connection failures by normalized error class. | `newt_connection_errors_total{transport="websocket",error_type="tls_handshake",site_id="acme-edge-1"} 1` |
|
||||||
| `newt_config_reloads_total` | Counter (Int64) | `result`, `site_id`, `region` (optional) | Counts remote blueprint/config reloads. | `newt_config_reloads_total{result="success",site_id="acme-edge-1"} 3` |
|
| `newt_config_reloads_total` | Counter (Int64) | `result`, `site_id`, `region` (optional) | Counts remote blueprint/config reloads. | `newt_config_reloads_total{result="success",site_id="acme-edge-1"} 3` |
|
||||||
| `newt_restart_count_total` | Counter (Int64) | `site_id`, `region` (optional) | Increments once per process boot to detect restarts. | `newt_restart_count_total{site_id="acme-edge-1"} 1` |
|
| `process_start_time_seconds` | Observable gauge (Float64) | — | Unix timestamp of the Newt process start time (use `time() - process_start_time_seconds` for uptime). | `process_start_time_seconds 1.728e+09` |
|
||||||
| `newt_config_apply_seconds` | Histogram (Float64) | `phase` (`interface`/`peer`), `result`, `site_id`, `region` (optional) | Measures time spent applying WireGuard configuration phases. | `newt_config_apply_seconds_sum{phase="peer",result="success",site_id="acme-edge-1"} 0.48` |
|
| `newt_config_apply_seconds` | Histogram (Float64) | `phase` (`interface`/`peer`), `result`, `site_id`, `region` (optional) | Measures time spent applying WireGuard configuration phases. | `newt_config_apply_seconds_sum{phase="peer",result="success",site_id="acme-edge-1"} 0.48` |
|
||||||
| `newt_cert_rotation_total` | Counter (Int64) | `result`, `site_id`, `region` (optional) | Tracks client certificate rotation attempts. | `newt_cert_rotation_total{result="success",site_id="acme-edge-1"} 2` |
|
| `newt_cert_rotation_total` | Counter (Int64) | `result`, `site_id`, `region` (optional) | Tracks client certificate rotation attempts. | `newt_cert_rotation_total{result="success",site_id="acme-edge-1"} 2` |
|
||||||
| `newt_websocket_connect_latency_seconds` | Histogram (Float64) | `transport="websocket"`, `result`, `error_type` (on failure), `site_id`, `region` (optional) | Measures WebSocket dial latency and exposes failure buckets. | `newt_websocket_connect_latency_seconds_bucket{result="success",le="0.5",site_id="acme-edge-1"} 9` |
|
| `newt_websocket_connect_latency_seconds` | Histogram (Float64) | `transport="websocket"`, `result`, `error_type` (on failure), `site_id`, `region` (optional) | Measures WebSocket dial latency and exposes failure buckets. | `newt_websocket_connect_latency_seconds_bucket{result="success",le="0.5",site_id="acme-edge-1"} 9` |
|
||||||
| `newt_websocket_messages_total` | Counter (Int64) | `direction` (`in`/`out`), `msg_type` (`text`/`ping`/`pong`), `site_id`, `region` (optional) | Accounts for control WebSocket traffic volume by type. | `newt_websocket_messages_total{direction="out",msg_type="ping",site_id="acme-edge-1"} 12` |
|
| `newt_websocket_messages_total` | Counter (Int64) | `direction` (`in`/`out`), `msg_type` (`text`/`ping`/`pong`), `site_id`, `region` (optional) | Accounts for control WebSocket traffic volume by type. | `newt_websocket_messages_total{direction="out",msg_type="ping",site_id="acme-edge-1"} 12` |
|
||||||
|
| `newt_websocket_connected` | Observable gauge (Int64) | `site_id`, `region` (optional) | Reports current WebSocket connectivity (`1` when connected). | `newt_websocket_connected{site_id="acme-edge-1"} 1` |
|
||||||
|
| `newt_websocket_reconnects_total` | Counter (Int64) | `reason` (`tls_handshake`, `dial_timeout`, `io_error`, `ping_write`, `timeout`, etc.), `site_id`, `region` (optional) | Counts reconnect attempts with normalized reasons for failure analysis. | `newt_websocket_reconnects_total{reason="timeout",site_id="acme-edge-1"} 3` |
|
||||||
| `newt_proxy_active_connections` | Observable gauge (Int64) | `protocol` (`tcp`/`udp`), `direction` (`ingress`/`egress`), `tunnel_id` (optional), `site_id`, `region` (optional) | Current proxy connections per tunnel and protocol. | `newt_proxy_active_connections{protocol="tcp",direction="egress",site_id="acme-edge-1"} 4` |
|
| `newt_proxy_active_connections` | Observable gauge (Int64) | `protocol` (`tcp`/`udp`), `direction` (`ingress`/`egress`), `tunnel_id` (optional), `site_id`, `region` (optional) | Current proxy connections per tunnel and protocol. | `newt_proxy_active_connections{protocol="tcp",direction="egress",site_id="acme-edge-1"} 4` |
|
||||||
| `newt_proxy_buffer_bytes` | Observable gauge (Int64) | `protocol`, `direction`, `tunnel_id` (optional), `site_id`, `region` (optional) | Volume of buffered data awaiting flush in proxy queues. | `newt_proxy_buffer_bytes{protocol="udp",direction="egress",site_id="acme-edge-1"} 2048` |
|
| `newt_proxy_buffer_bytes` | Observable gauge (Int64) | `protocol`, `direction`, `tunnel_id` (optional), `site_id`, `region` (optional) | Volume of buffered data awaiting flush in proxy queues. | `newt_proxy_buffer_bytes{protocol="udp",direction="egress",site_id="acme-edge-1"} 2048` |
|
||||||
| `newt_proxy_async_backlog_bytes` | Observable gauge (Int64) | `protocol`, `direction`, `tunnel_id` (optional), `site_id`, `region` (optional) | Tracks async write backlog when deferred flushing is enabled. | `newt_proxy_async_backlog_bytes{protocol="tcp",direction="egress",site_id="acme-edge-1"} 512` |
|
| `newt_proxy_async_backlog_bytes` | Observable gauge (Int64) | `protocol`, `direction`, `tunnel_id` (optional), `site_id`, `region` (optional) | Tracks async write backlog when deferred flushing is enabled. | `newt_proxy_async_backlog_bytes{protocol="tcp",direction="egress",site_id="acme-edge-1"} 512` |
|
||||||
| `newt_proxy_drops_total` | Counter (Int64) | `protocol`, `tunnel_id` (optional), `site_id`, `region` (optional) | Counts proxy drop events caused by downstream write errors. | `newt_proxy_drops_total{protocol="udp",site_id="acme-edge-1"} 1` |
|
| `newt_proxy_drops_total` | Counter (Int64) | `protocol`, `tunnel_id` (optional), `site_id`, `region` (optional) | Counts proxy drop events caused by downstream write errors. | `newt_proxy_drops_total{protocol="udp",site_id="acme-edge-1"} 1` |
|
||||||
|
| `newt_proxy_connections_total` | Counter (Int64) | `event` (`opened`/`closed`), `protocol`, `tunnel_id` (optional), `site_id`, `region` (optional) | Tracks proxy connection lifecycle events for rate/SLO calculations. | `newt_proxy_connections_total{event="opened",protocol="tcp",site_id="acme-edge-1"} 10` |
|
||||||
|
|
||||||
Conventions
|
Conventions
|
||||||
|
|
||||||
@@ -174,7 +180,7 @@ sum(newt_tunnel_sessions)
|
|||||||
Compatibility notes
|
Compatibility notes
|
||||||
|
|
||||||
- Gauges do not use the _total suffix (e.g., newt_tunnel_sessions).
|
- Gauges do not use the _total suffix (e.g., newt_tunnel_sessions).
|
||||||
- site_id is emitted as both resource attribute and metric label on all newt_* series; region is included as a metric label only when set. tunnel_id is a metric label (WireGuard public key). Never expose secrets in labels.
|
- site_id/region remain resource attributes. Metric labels for these fields appear on per-site gauges (e.g., `newt_site_online`) and, by default, on counters/histograms; disable them with `NEWT_METRICS_INCLUDE_SITE_LABELS=false` if needed. `tunnel_id` is a metric label (WireGuard public key). Never expose secrets in labels.
|
||||||
- NEWT_METRICS_INCLUDE_TUNNEL_ID (default: true) toggles whether tunnel_id is included as a label on bytes/sessions/proxy/reconnect metrics. Disable in high-cardinality environments.
|
- NEWT_METRICS_INCLUDE_TUNNEL_ID (default: true) toggles whether tunnel_id is included as a label on bytes/sessions/proxy/reconnect metrics. Disable in high-cardinality environments.
|
||||||
- Avoid double-scraping: scrape either Newt (/metrics) or the Collector's Prometheus exporter, not both.
|
- Avoid double-scraping: scrape either Newt (/metrics) or the Collector's Prometheus exporter, not both.
|
||||||
- Prometheus does not accept remote_write; use Mimir/Cortex/VM/Thanos-Receive for remote_write.
|
- Prometheus does not accept remote_write; use Mimir/Cortex/VM/Thanos-Receive for remote_write.
|
||||||
|
|||||||
@@ -16,7 +16,7 @@ A global attribute filter (see `buildMeterProvider`) constrains exposed label ke
|
|||||||
exports stay bounded.
|
exports stay bounded.
|
||||||
|
|
||||||
- **Site lifecycle**: `newt_site_registrations_total`, `newt_site_online`, and
|
- **Site lifecycle**: `newt_site_registrations_total`, `newt_site_online`, and
|
||||||
`newt_site_last_heartbeat_seconds` capture registration attempts and liveness. They
|
`newt_site_last_heartbeat_timestamp_seconds` capture registration attempts and liveness. They
|
||||||
are fed either manually (`IncSiteRegistration`) or via the `TelemetryView` state
|
are fed either manually (`IncSiteRegistration`) or via the `TelemetryView` state
|
||||||
callback that publishes observable gauges for the active site.
|
callback that publishes observable gauges for the active site.
|
||||||
- **Tunnel health and usage**: Counters and histograms track bytes, latency, reconnects,
|
- **Tunnel health and usage**: Counters and histograms track bytes, latency, reconnects,
|
||||||
@@ -27,17 +27,20 @@ exports stay bounded.
|
|||||||
`newt_connection_errors_total` are emitted throughout the WebSocket client to classify
|
`newt_connection_errors_total` are emitted throughout the WebSocket client to classify
|
||||||
authentication, dial, and transport failures.
|
authentication, dial, and transport failures.
|
||||||
- **Operations/configuration**: `newt_config_reloads_total`,
|
- **Operations/configuration**: `newt_config_reloads_total`,
|
||||||
`newt_restart_count_total`, `newt_config_apply_seconds`, and
|
`process_start_time_seconds`, `newt_config_apply_seconds`, and
|
||||||
`newt_cert_rotation_total` provide visibility into blueprint reloads, process boots,
|
`newt_cert_rotation_total` provide visibility into blueprint reloads, process boots,
|
||||||
configuration timings, and certificate rotation outcomes.
|
configuration timings, and certificate rotation outcomes.
|
||||||
- **Build metadata**: `newt_build_info` records the binary version/commit together
|
- **Build metadata**: `newt_build_info` records the binary version/commit together
|
||||||
with a monotonic restart counter when build information is supplied at startup.
|
with optional site metadata when build information is supplied at startup.
|
||||||
- **WebSocket control-plane**: `newt_websocket_connect_latency_seconds` and
|
- **WebSocket control-plane**: `newt_websocket_connect_latency_seconds`,
|
||||||
`newt_websocket_messages_total` report connect latency and ping/pong/text activity.
|
`newt_websocket_messages_total`, `newt_websocket_connected`, and
|
||||||
|
`newt_websocket_reconnects_total` report connect latency, ping/pong/text activity,
|
||||||
|
connection state, and reconnect reasons.
|
||||||
- **Proxy data-plane**: Observable gauges (`newt_proxy_active_connections`,
|
- **Proxy data-plane**: Observable gauges (`newt_proxy_active_connections`,
|
||||||
`newt_proxy_buffer_bytes`, `newt_proxy_async_backlog_bytes`) and the
|
`newt_proxy_buffer_bytes`, `newt_proxy_async_backlog_bytes`) plus counters for
|
||||||
`newt_proxy_drops_total` counter are fed from the proxy manager to monitor backlog
|
drops, accepts, connection lifecycle events (`newt_proxy_connections_total`), and
|
||||||
and drop behaviour alongside per-protocol byte counters.
|
duration histograms (`newt_proxy_connection_duration_seconds`) surface backlog,
|
||||||
|
drop behaviour, and churn alongside per-protocol byte counters.
|
||||||
|
|
||||||
Refer to `docs/observability.md` for a tabular catalogue with instrument types,
|
Refer to `docs/observability.md` for a tabular catalogue with instrument types,
|
||||||
attributes, and sample exposition lines.
|
attributes, and sample exposition lines.
|
||||||
@@ -61,8 +64,9 @@ The implementation adheres to most OTel Go recommendations:
|
|||||||
suffixes for counters and `_seconds`/`_bytes` unit conventions. Histograms are
|
suffixes for counters and `_seconds`/`_bytes` unit conventions. Histograms are
|
||||||
registered with explicit second-based buckets.
|
registered with explicit second-based buckets.
|
||||||
- **Resource attributes** – Service name/version and optional `site_id`/`region`
|
- **Resource attributes** – Service name/version and optional `site_id`/`region`
|
||||||
populate the `resource.Resource` and are also injected as metric attributes for
|
populate the `resource.Resource`. Metric labels mirror these by default (and on
|
||||||
compatibility with Prometheus queries.
|
per-site gauges) but can be disabled with `NEWT_METRICS_INCLUDE_SITE_LABELS=false`
|
||||||
|
to avoid unnecessary cardinality growth.
|
||||||
- **Attribute hygiene** – A single attribute filter (`sdkmetric.WithView`) enforces
|
- **Attribute hygiene** – A single attribute filter (`sdkmetric.WithView`) enforces
|
||||||
the allow-list of label keys to prevent accidental high-cardinality emission.
|
the allow-list of label keys to prevent accidental high-cardinality emission.
|
||||||
- **Runtime metrics** – Go runtime instrumentation is enabled automatically through
|
- **Runtime metrics** – Go runtime instrumentation is enabled automatically through
|
||||||
@@ -83,10 +87,9 @@ The review identified a few actionable adjustments:
|
|||||||
2. **Surface config reload failures** – `telemetry.IncConfigReload` is invoked with
|
2. **Surface config reload failures** – `telemetry.IncConfigReload` is invoked with
|
||||||
`result="success"` only. Callers should record a failure result when blueprint
|
`result="success"` only. Callers should record a failure result when blueprint
|
||||||
parsing or application aborts before success counters are incremented.
|
parsing or application aborts before success counters are incremented.
|
||||||
3. **Harmonise restart count behaviour** – `newt_restart_count_total` increments only
|
3. **Expose robust uptime** – Document using `time() - process_start_time_seconds`
|
||||||
when build metadata is provided. Consider moving the increment out of
|
to derive uptime now that the restart counter has been replaced with a timestamp
|
||||||
`RegisterBuildInfo` so the counter advances even for ad-hoc builds without version
|
gauge.
|
||||||
strings.
|
|
||||||
4. **Propagate contexts where available** – Many emitters call metric helpers with
|
4. **Propagate contexts where available** – Many emitters call metric helpers with
|
||||||
`context.Background()`. Passing real contexts (when inexpensive) would allow future
|
`context.Background()`. Passing real contexts (when inexpensive) would allow future
|
||||||
exporters to correlate spans and metrics.
|
exporters to correlate spans and metrics.
|
||||||
@@ -98,17 +101,17 @@ The review identified a few actionable adjustments:
|
|||||||
|
|
||||||
Prioritised additions that would close visibility gaps:
|
Prioritised additions that would close visibility gaps:
|
||||||
|
|
||||||
1. **WebSocket disconnect outcomes** – A counter (e.g., `newt_websocket_disconnects_total`)
|
1. **Config reload error taxonomy** – Split reload attempts into a dedicated
|
||||||
partitioned by `reason` would complement the existing connect latency histogram and
|
`newt_config_reload_errors_total{phase}` counter to make blueprint validation failures
|
||||||
explain reconnect storms.
|
visible alongside the existing success counter.
|
||||||
2. **Keepalive/heartbeat failures** – Counting ping timeouts or heartbeat misses would
|
2. **Config source visibility** – Export `newt_config_source_info{source,version}` so
|
||||||
make `newt_site_last_heartbeat_seconds` actionable by providing discrete events.
|
operators can audit the active blueprint origin/commit during incidents.
|
||||||
3. **Proxy connection lifecycle** – Add counters/histograms for proxy accept events and
|
3. **Certificate expiry** – Emit `newt_cert_expiry_timestamp_seconds` (per cert) to
|
||||||
connection durations to correlate drops with load and backlog metrics.
|
enable proactive alerts before mTLS credentials lapse.
|
||||||
4. **Blueprint/config pull latency** – Measuring Pangolin blueprint fetch durations and
|
4. **Blueprint/config pull latency** – Measuring Pangolin blueprint fetch durations and
|
||||||
HTTP status distribution would expose slow control-plane operations.
|
HTTP status distribution would expose slow control-plane operations.
|
||||||
5. **Certificate rotation attempts** – Complement `newt_cert_rotation_total` with a
|
5. **Tunnel setup latency** – Histograms for DNS resolution and tunnel handshakes would
|
||||||
duration histogram to observe slow PKI updates and detect stuck rotations.
|
help correlate connect latency spikes with network dependencies.
|
||||||
|
|
||||||
These metrics rely on data that is already available in the code paths mentioned
|
These metrics rely on data that is already available in the code paths mentioned
|
||||||
above and would round out operational dashboards.
|
above and would round out operational dashboards.
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ package telemetry
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"go.opentelemetry.io/otel"
|
"go.opentelemetry.io/otel"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
@@ -37,9 +39,9 @@ var (
|
|||||||
|
|
||||||
// Config/Restart
|
// Config/Restart
|
||||||
mConfigReloads metric.Int64Counter
|
mConfigReloads metric.Int64Counter
|
||||||
mRestartCount metric.Int64Counter
|
|
||||||
mConfigApply metric.Float64Histogram
|
mConfigApply metric.Float64Histogram
|
||||||
mCertRotationTotal metric.Int64Counter
|
mCertRotationTotal metric.Int64Counter
|
||||||
|
mProcessStartTime metric.Float64ObservableGauge
|
||||||
|
|
||||||
// Build info
|
// Build info
|
||||||
mBuildInfo metric.Int64ObservableGauge
|
mBuildInfo metric.Int64ObservableGauge
|
||||||
@@ -50,6 +52,8 @@ var (
|
|||||||
mWSDisconnects metric.Int64Counter
|
mWSDisconnects metric.Int64Counter
|
||||||
mWSKeepaliveFailure metric.Int64Counter
|
mWSKeepaliveFailure metric.Int64Counter
|
||||||
mWSSessionDuration metric.Float64Histogram
|
mWSSessionDuration metric.Float64Histogram
|
||||||
|
mWSConnected metric.Int64ObservableGauge
|
||||||
|
mWSReconnects metric.Int64Counter
|
||||||
|
|
||||||
// Proxy
|
// Proxy
|
||||||
mProxyActiveConns metric.Int64ObservableGauge
|
mProxyActiveConns metric.Int64ObservableGauge
|
||||||
@@ -58,16 +62,28 @@ var (
|
|||||||
mProxyDropsTotal metric.Int64Counter
|
mProxyDropsTotal metric.Int64Counter
|
||||||
mProxyAcceptsTotal metric.Int64Counter
|
mProxyAcceptsTotal metric.Int64Counter
|
||||||
mProxyConnDuration metric.Float64Histogram
|
mProxyConnDuration metric.Float64Histogram
|
||||||
|
mProxyConnectionsTotal metric.Int64Counter
|
||||||
|
|
||||||
buildVersion string
|
buildVersion string
|
||||||
buildCommit string
|
buildCommit string
|
||||||
|
processStartUnix = float64(time.Now().UnixNano()) / 1e9
|
||||||
|
wsConnectedState atomic.Int64
|
||||||
)
|
)
|
||||||
|
|
||||||
// attrsWithSite appends global site/region labels when present.
|
// Proxy connection lifecycle events.
|
||||||
|
const (
|
||||||
|
ProxyConnectionOpened = "opened"
|
||||||
|
ProxyConnectionClosed = "closed"
|
||||||
|
)
|
||||||
|
|
||||||
|
// attrsWithSite appends site/region labels only when explicitly enabled to keep
|
||||||
|
// label cardinality low by default.
|
||||||
func attrsWithSite(extra ...attribute.KeyValue) []attribute.KeyValue {
|
func attrsWithSite(extra ...attribute.KeyValue) []attribute.KeyValue {
|
||||||
attrs := make([]attribute.KeyValue, 0, len(extra)+2)
|
attrs := make([]attribute.KeyValue, len(extra))
|
||||||
attrs = append(attrs, extra...)
|
copy(attrs, extra)
|
||||||
attrs = append(attrs, siteAttrs()...)
|
if ShouldIncludeSiteLabels() {
|
||||||
|
attrs = append(attrs, siteAttrs()...)
|
||||||
|
}
|
||||||
return attrs
|
return attrs
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -111,8 +127,9 @@ func registerSiteInstruments() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
mSiteLastHeartbeat, err = meter.Float64ObservableGauge("newt_site_last_heartbeat_seconds",
|
mSiteLastHeartbeat, err = meter.Float64ObservableGauge("newt_site_last_heartbeat_timestamp_seconds",
|
||||||
metric.WithDescription("Seconds since last site heartbeat"))
|
metric.WithDescription("Unix timestamp of the last site heartbeat"),
|
||||||
|
metric.WithUnit("s"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -164,13 +181,22 @@ func registerConnInstruments() error {
|
|||||||
func registerConfigInstruments() error {
|
func registerConfigInstruments() error {
|
||||||
mConfigReloads, _ = meter.Int64Counter("newt_config_reloads_total",
|
mConfigReloads, _ = meter.Int64Counter("newt_config_reloads_total",
|
||||||
metric.WithDescription("Configuration reloads"))
|
metric.WithDescription("Configuration reloads"))
|
||||||
mRestartCount, _ = meter.Int64Counter("newt_restart_count_total",
|
|
||||||
metric.WithDescription("Process restart count (incremented on start)"))
|
|
||||||
mConfigApply, _ = meter.Float64Histogram("newt_config_apply_seconds",
|
mConfigApply, _ = meter.Float64Histogram("newt_config_apply_seconds",
|
||||||
metric.WithDescription("Configuration apply duration in seconds"),
|
metric.WithDescription("Configuration apply duration in seconds"),
|
||||||
metric.WithUnit("s"))
|
metric.WithUnit("s"))
|
||||||
mCertRotationTotal, _ = meter.Int64Counter("newt_cert_rotation_total",
|
mCertRotationTotal, _ = meter.Int64Counter("newt_cert_rotation_total",
|
||||||
metric.WithDescription("Certificate rotation events (success/failure)"))
|
metric.WithDescription("Certificate rotation events (success/failure)"))
|
||||||
|
mProcessStartTime, _ = meter.Float64ObservableGauge("process_start_time_seconds",
|
||||||
|
metric.WithDescription("Unix timestamp of the process start time"),
|
||||||
|
metric.WithUnit("s"))
|
||||||
|
if mProcessStartTime != nil {
|
||||||
|
if _, err := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
|
||||||
|
o.ObserveFloat64(mProcessStartTime, processStartUnix)
|
||||||
|
return nil
|
||||||
|
}, mProcessStartTime); err != nil {
|
||||||
|
otel.Handle(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -191,6 +217,10 @@ func registerBuildWSProxyInstruments() error {
|
|||||||
mWSSessionDuration, _ = meter.Float64Histogram("newt_websocket_session_duration_seconds",
|
mWSSessionDuration, _ = meter.Float64Histogram("newt_websocket_session_duration_seconds",
|
||||||
metric.WithDescription("Duration of established WebSocket sessions"),
|
metric.WithDescription("Duration of established WebSocket sessions"),
|
||||||
metric.WithUnit("s"))
|
metric.WithUnit("s"))
|
||||||
|
mWSConnected, _ = meter.Int64ObservableGauge("newt_websocket_connected",
|
||||||
|
metric.WithDescription("WebSocket connection state (1=connected, 0=disconnected)"))
|
||||||
|
mWSReconnects, _ = meter.Int64Counter("newt_websocket_reconnects_total",
|
||||||
|
metric.WithDescription("WebSocket reconnect attempts by reason"))
|
||||||
// Proxy
|
// Proxy
|
||||||
mProxyActiveConns, _ = meter.Int64ObservableGauge("newt_proxy_active_connections",
|
mProxyActiveConns, _ = meter.Int64ObservableGauge("newt_proxy_active_connections",
|
||||||
metric.WithDescription("Proxy active connections per tunnel and protocol"))
|
metric.WithDescription("Proxy active connections per tunnel and protocol"))
|
||||||
@@ -207,6 +237,8 @@ func registerBuildWSProxyInstruments() error {
|
|||||||
mProxyConnDuration, _ = meter.Float64Histogram("newt_proxy_connection_duration_seconds",
|
mProxyConnDuration, _ = meter.Float64Histogram("newt_proxy_connection_duration_seconds",
|
||||||
metric.WithDescription("Duration of completed proxy connections"),
|
metric.WithDescription("Duration of completed proxy connections"),
|
||||||
metric.WithUnit("s"))
|
metric.WithUnit("s"))
|
||||||
|
mProxyConnectionsTotal, _ = meter.Int64Counter("newt_proxy_connections_total",
|
||||||
|
metric.WithDescription("Proxy connection lifecycle events by protocol"))
|
||||||
// Register a default callback for build info if version/commit set
|
// Register a default callback for build info if version/commit set
|
||||||
reg, e := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
|
reg, e := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
|
||||||
if buildVersion == "" && buildCommit == "" {
|
if buildVersion == "" && buildCommit == "" {
|
||||||
@@ -219,7 +251,9 @@ func registerBuildWSProxyInstruments() error {
|
|||||||
if buildCommit != "" {
|
if buildCommit != "" {
|
||||||
attrs = append(attrs, attribute.String("commit", buildCommit))
|
attrs = append(attrs, attribute.String("commit", buildCommit))
|
||||||
}
|
}
|
||||||
attrs = append(attrs, siteAttrs()...)
|
if ShouldIncludeSiteLabels() {
|
||||||
|
attrs = append(attrs, siteAttrs()...)
|
||||||
|
}
|
||||||
o.ObserveInt64(mBuildInfo, 1, metric.WithAttributes(attrs...))
|
o.ObserveInt64(mBuildInfo, 1, metric.WithAttributes(attrs...))
|
||||||
return nil
|
return nil
|
||||||
}, mBuildInfo)
|
}, mBuildInfo)
|
||||||
@@ -229,6 +263,17 @@ func registerBuildWSProxyInstruments() error {
|
|||||||
// Provide a functional stopper that unregisters the callback
|
// Provide a functional stopper that unregisters the callback
|
||||||
obsStopper = func() { _ = reg.Unregister() }
|
obsStopper = func() { _ = reg.Unregister() }
|
||||||
}
|
}
|
||||||
|
if mWSConnected != nil {
|
||||||
|
if regConn, err := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
|
||||||
|
val := wsConnectedState.Load()
|
||||||
|
o.ObserveInt64(mWSConnected, val, metric.WithAttributes(attrsWithSite()...))
|
||||||
|
return nil
|
||||||
|
}, mWSConnected); err != nil {
|
||||||
|
otel.Handle(err)
|
||||||
|
} else {
|
||||||
|
wsConnStopper = func() { _ = regConn.Unregister() }
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -237,10 +282,11 @@ func registerBuildWSProxyInstruments() error {
|
|||||||
// heartbeat seconds, and active sessions.
|
// heartbeat seconds, and active sessions.
|
||||||
|
|
||||||
var (
|
var (
|
||||||
obsOnce sync.Once
|
obsOnce sync.Once
|
||||||
obsStopper func()
|
obsStopper func()
|
||||||
proxyObsOnce sync.Once
|
proxyObsOnce sync.Once
|
||||||
proxyStopper func()
|
proxyStopper func()
|
||||||
|
wsConnStopper func()
|
||||||
)
|
)
|
||||||
|
|
||||||
// SetObservableCallback registers a single callback that will be invoked
|
// SetObservableCallback registers a single callback that will be invoked
|
||||||
@@ -251,7 +297,7 @@ var (
|
|||||||
//
|
//
|
||||||
// telemetry.SetObservableCallback(func(ctx context.Context, o metric.Observer) error {
|
// telemetry.SetObservableCallback(func(ctx context.Context, o metric.Observer) error {
|
||||||
// o.ObserveInt64(mSiteOnline, 1)
|
// o.ObserveInt64(mSiteOnline, 1)
|
||||||
// o.ObserveFloat64(mSiteLastHeartbeat, time.Since(lastHB).Seconds())
|
// o.ObserveFloat64(mSiteLastHeartbeat, float64(lastHB.Unix()))
|
||||||
// o.ObserveInt64(mTunnelSessions, int64(len(activeSessions)))
|
// o.ObserveInt64(mTunnelSessions, int64(len(activeSessions)))
|
||||||
// return nil
|
// return nil
|
||||||
// })
|
// })
|
||||||
@@ -290,8 +336,6 @@ func SetProxyObservableCallback(cb func(context.Context, metric.Observer) error)
|
|||||||
func RegisterBuildInfo(version, commit string) {
|
func RegisterBuildInfo(version, commit string) {
|
||||||
buildVersion = version
|
buildVersion = version
|
||||||
buildCommit = commit
|
buildCommit = commit
|
||||||
// Increment restart count on boot
|
|
||||||
mRestartCount.Add(context.Background(), 1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config reloads
|
// Config reloads
|
||||||
@@ -358,6 +402,25 @@ func IncWSKeepaliveFailure(ctx context.Context, reason string) {
|
|||||||
)...))
|
)...))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetWSConnectionState updates the backing gauge for the WebSocket connected state.
|
||||||
|
func SetWSConnectionState(connected bool) {
|
||||||
|
if connected {
|
||||||
|
wsConnectedState.Store(1)
|
||||||
|
} else {
|
||||||
|
wsConnectedState.Store(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IncWSReconnect increments the WebSocket reconnect counter with a bounded reason label.
|
||||||
|
func IncWSReconnect(ctx context.Context, reason string) {
|
||||||
|
if reason == "" {
|
||||||
|
reason = "unknown"
|
||||||
|
}
|
||||||
|
mWSReconnects.Add(ctx, 1, metric.WithAttributes(attrsWithSite(
|
||||||
|
attribute.String("reason", reason),
|
||||||
|
)...))
|
||||||
|
}
|
||||||
|
|
||||||
func ObserveWSSessionDuration(ctx context.Context, seconds float64, result string) {
|
func ObserveWSSessionDuration(ctx context.Context, seconds float64, result string) {
|
||||||
mWSSessionDuration.Record(ctx, seconds, metric.WithAttributes(attrsWithSite(
|
mWSSessionDuration.Record(ctx, seconds, metric.WithAttributes(attrsWithSite(
|
||||||
attribute.String("result", result),
|
attribute.String("result", result),
|
||||||
@@ -413,6 +476,21 @@ func ObserveProxyConnectionDuration(ctx context.Context, tunnelID, protocol, res
|
|||||||
mProxyConnDuration.Record(ctx, seconds, metric.WithAttributes(attrsWithSite(attrs...)...))
|
mProxyConnDuration.Record(ctx, seconds, metric.WithAttributes(attrsWithSite(attrs...)...))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IncProxyConnectionEvent records proxy connection lifecycle events (opened/closed).
|
||||||
|
func IncProxyConnectionEvent(ctx context.Context, tunnelID, protocol, event string) {
|
||||||
|
if event == "" {
|
||||||
|
event = "unknown"
|
||||||
|
}
|
||||||
|
attrs := []attribute.KeyValue{
|
||||||
|
attribute.String("protocol", protocol),
|
||||||
|
attribute.String("event", event),
|
||||||
|
}
|
||||||
|
if ShouldIncludeTunnelID() && tunnelID != "" {
|
||||||
|
attrs = append(attrs, attribute.String("tunnel_id", tunnelID))
|
||||||
|
}
|
||||||
|
mProxyConnectionsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite(attrs...)...))
|
||||||
|
}
|
||||||
|
|
||||||
// --- Config/PKI helpers ---
|
// --- Config/PKI helpers ---
|
||||||
|
|
||||||
func ObserveConfigApply(ctx context.Context, phase, result string, seconds float64) {
|
func ObserveConfigApply(ctx context.Context, phase, result string, seconds float64) {
|
||||||
|
|||||||
59
internal/telemetry/metrics_test_helper.go
Normal file
59
internal/telemetry/metrics_test_helper.go
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
package telemetry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func resetMetricsForTest() {
|
||||||
|
initOnce = sync.Once{}
|
||||||
|
obsOnce = sync.Once{}
|
||||||
|
proxyObsOnce = sync.Once{}
|
||||||
|
obsStopper = nil
|
||||||
|
proxyStopper = nil
|
||||||
|
if wsConnStopper != nil {
|
||||||
|
wsConnStopper()
|
||||||
|
}
|
||||||
|
wsConnStopper = nil
|
||||||
|
meter = nil
|
||||||
|
|
||||||
|
mSiteRegistrations = nil
|
||||||
|
mSiteOnline = nil
|
||||||
|
mSiteLastHeartbeat = nil
|
||||||
|
|
||||||
|
mTunnelSessions = nil
|
||||||
|
mTunnelBytes = nil
|
||||||
|
mTunnelLatency = nil
|
||||||
|
mReconnects = nil
|
||||||
|
|
||||||
|
mConnAttempts = nil
|
||||||
|
mConnErrors = nil
|
||||||
|
|
||||||
|
mConfigReloads = nil
|
||||||
|
mConfigApply = nil
|
||||||
|
mCertRotationTotal = nil
|
||||||
|
mProcessStartTime = nil
|
||||||
|
|
||||||
|
mBuildInfo = nil
|
||||||
|
|
||||||
|
mWSConnectLatency = nil
|
||||||
|
mWSMessages = nil
|
||||||
|
mWSDisconnects = nil
|
||||||
|
mWSKeepaliveFailure = nil
|
||||||
|
mWSSessionDuration = nil
|
||||||
|
mWSConnected = nil
|
||||||
|
mWSReconnects = nil
|
||||||
|
|
||||||
|
mProxyActiveConns = nil
|
||||||
|
mProxyBufferBytes = nil
|
||||||
|
mProxyAsyncBacklogByte = nil
|
||||||
|
mProxyDropsTotal = nil
|
||||||
|
mProxyAcceptsTotal = nil
|
||||||
|
mProxyConnDuration = nil
|
||||||
|
mProxyConnectionsTotal = nil
|
||||||
|
|
||||||
|
processStartUnix = float64(time.Now().UnixNano()) / 1e9
|
||||||
|
wsConnectedState.Store(0)
|
||||||
|
includeTunnelIDVal.Store(false)
|
||||||
|
includeSiteLabelVal.Store(false)
|
||||||
|
}
|
||||||
@@ -62,8 +62,8 @@ func observeSiteOnlineFor(o metric.Observer, sv StateView, siteID string) {
|
|||||||
|
|
||||||
func observeLastHeartbeatFor(o metric.Observer, sv StateView, siteID string) {
|
func observeLastHeartbeatFor(o metric.Observer, sv StateView, siteID string) {
|
||||||
if t, ok := sv.LastHeartbeat(siteID); ok {
|
if t, ok := sv.LastHeartbeat(siteID); ok {
|
||||||
secs := time.Since(t).Seconds()
|
ts := float64(t.UnixNano()) / 1e9
|
||||||
o.ObserveFloat64(mSiteLastHeartbeat, secs, metric.WithAttributes(
|
o.ObserveFloat64(mSiteLastHeartbeat, ts, metric.WithAttributes(
|
||||||
attribute.String("site_id", siteID),
|
attribute.String("site_id", siteID),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -118,6 +118,11 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) {
|
|||||||
} else {
|
} else {
|
||||||
includeTunnelIDVal.Store(false)
|
includeTunnelIDVal.Store(false)
|
||||||
}
|
}
|
||||||
|
if getenv("NEWT_METRICS_INCLUDE_SITE_LABELS", "true") == "true" {
|
||||||
|
includeSiteLabelVal.Store(true)
|
||||||
|
} else {
|
||||||
|
includeSiteLabelVal.Store(false)
|
||||||
|
}
|
||||||
res := buildResource(ctx, cfg)
|
res := buildResource(ctx, cfg)
|
||||||
UpdateSiteInfo(cfg.SiteID, cfg.Region)
|
UpdateSiteInfo(cfg.SiteID, cfg.Region)
|
||||||
|
|
||||||
@@ -294,7 +299,10 @@ func parseResourceAttributes(s string) map[string]string {
|
|||||||
// Global site/region used to enrich metric labels.
|
// Global site/region used to enrich metric labels.
|
||||||
var siteIDVal atomic.Value
|
var siteIDVal atomic.Value
|
||||||
var regionVal atomic.Value
|
var regionVal atomic.Value
|
||||||
var includeTunnelIDVal atomic.Value // bool; default true
|
var (
|
||||||
|
includeTunnelIDVal atomic.Value // bool; default true
|
||||||
|
includeSiteLabelVal atomic.Value // bool; default false
|
||||||
|
)
|
||||||
|
|
||||||
// UpdateSiteInfo updates the global site_id and region used for metric labels.
|
// UpdateSiteInfo updates the global site_id and region used for metric labels.
|
||||||
// Thread-safe via atomic.Value: subsequent metric emissions will include
|
// Thread-safe via atomic.Value: subsequent metric emissions will include
|
||||||
@@ -335,7 +343,12 @@ func siteAttrs() []attribute.KeyValue {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SiteLabelKVs exposes site label KVs for other packages (e.g., proxy manager).
|
// SiteLabelKVs exposes site label KVs for other packages (e.g., proxy manager).
|
||||||
func SiteLabelKVs() []attribute.KeyValue { return siteAttrs() }
|
func SiteLabelKVs() []attribute.KeyValue {
|
||||||
|
if !ShouldIncludeSiteLabels() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return siteAttrs()
|
||||||
|
}
|
||||||
|
|
||||||
// ShouldIncludeTunnelID returns whether tunnel_id labels should be emitted.
|
// ShouldIncludeTunnelID returns whether tunnel_id labels should be emitted.
|
||||||
func ShouldIncludeTunnelID() bool {
|
func ShouldIncludeTunnelID() bool {
|
||||||
@@ -345,6 +358,15 @@ func ShouldIncludeTunnelID() bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ShouldIncludeSiteLabels returns whether site_id/region should be emitted as
|
||||||
|
// metric labels in addition to resource attributes.
|
||||||
|
func ShouldIncludeSiteLabels() bool {
|
||||||
|
if v, ok := includeSiteLabelVal.Load().(bool); ok {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func getenv(k, d string) string {
|
func getenv(k, d string) string {
|
||||||
if v := os.Getenv(k); v != "" {
|
if v := os.Getenv(k); v != "" {
|
||||||
return v
|
return v
|
||||||
|
|||||||
@@ -14,17 +14,23 @@ import (
|
|||||||
|
|
||||||
// Test that disallowed attributes are filtered from the exposition.
|
// Test that disallowed attributes are filtered from the exposition.
|
||||||
func TestAttributeFilterDropsUnknownKeys(t *testing.T) {
|
func TestAttributeFilterDropsUnknownKeys(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0"}
|
resetMetricsForTest()
|
||||||
|
t.Setenv("NEWT_METRICS_INCLUDE_SITE_LABELS", "true")
|
||||||
|
cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0"}
|
||||||
tel, err := Init(ctx, cfg)
|
tel, err := Init(ctx, cfg)
|
||||||
if err != nil { t.Fatalf("init: %v", err) }
|
if err != nil {
|
||||||
|
t.Fatalf("init: %v", err)
|
||||||
|
}
|
||||||
defer func() { _ = tel.Shutdown(context.Background()) }()
|
defer func() { _ = tel.Shutdown(context.Background()) }()
|
||||||
|
|
||||||
if tel.PrometheusHandler == nil { t.Fatalf("prom handler nil") }
|
if tel.PrometheusHandler == nil {
|
||||||
|
t.Fatalf("prom handler nil")
|
||||||
|
}
|
||||||
ts := httptest.NewServer(tel.PrometheusHandler)
|
ts := httptest.NewServer(tel.PrometheusHandler)
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
// Add samples with disallowed attribute keys
|
// Add samples with disallowed attribute keys
|
||||||
for _, k := range []string{"forbidden", "site_id", "host"} {
|
for _, k := range []string{"forbidden", "site_id", "host"} {
|
||||||
set := attribute.NewSet(attribute.String(k, "x"))
|
set := attribute.NewSet(attribute.String(k, "x"))
|
||||||
AddTunnelBytesSet(ctx, 123, set)
|
AddTunnelBytesSet(ctx, 123, set)
|
||||||
@@ -32,7 +38,9 @@ cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0"}
|
|||||||
time.Sleep(50 * time.Millisecond)
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
|
||||||
resp, err := http.Get(ts.URL)
|
resp, err := http.Get(ts.URL)
|
||||||
if err != nil { t.Fatalf("GET: %v", err) }
|
if err != nil {
|
||||||
|
t.Fatalf("GET: %v", err)
|
||||||
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
b, _ := io.ReadAll(resp.Body)
|
b, _ := io.ReadAll(resp.Body)
|
||||||
body := string(b)
|
body := string(b)
|
||||||
@@ -43,4 +51,3 @@ cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0"}
|
|||||||
t.Fatalf("expected allowed attribute site_id to be present in metrics, got: %s", body)
|
t.Fatalf("expected allowed attribute site_id to be present in metrics, got: %s", body)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"os"
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@@ -15,36 +16,61 @@ import (
|
|||||||
// Golden test that /metrics contains expected metric names.
|
// Golden test that /metrics contains expected metric names.
|
||||||
func TestMetricsGoldenContains(t *testing.T) {
|
func TestMetricsGoldenContains(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0", BuildVersion: "test"}
|
resetMetricsForTest()
|
||||||
|
t.Setenv("NEWT_METRICS_INCLUDE_SITE_LABELS", "true")
|
||||||
|
cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0", BuildVersion: "test"}
|
||||||
tel, err := Init(ctx, cfg)
|
tel, err := Init(ctx, cfg)
|
||||||
if err != nil { t.Fatalf("telemetry init error: %v", err) }
|
if err != nil {
|
||||||
|
t.Fatalf("telemetry init error: %v", err)
|
||||||
|
}
|
||||||
defer func() { _ = tel.Shutdown(context.Background()) }()
|
defer func() { _ = tel.Shutdown(context.Background()) }()
|
||||||
|
|
||||||
if tel.PrometheusHandler == nil { t.Fatalf("prom handler nil") }
|
if tel.PrometheusHandler == nil {
|
||||||
|
t.Fatalf("prom handler nil")
|
||||||
|
}
|
||||||
ts := httptest.NewServer(tel.PrometheusHandler)
|
ts := httptest.NewServer(tel.PrometheusHandler)
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
// Trigger a counter
|
// Trigger counters to ensure they appear in the scrape
|
||||||
IncConnAttempt(ctx, "websocket", "success")
|
IncConnAttempt(ctx, "websocket", "success")
|
||||||
|
IncWSReconnect(ctx, "io_error")
|
||||||
|
IncProxyConnectionEvent(ctx, "", "tcp", ProxyConnectionOpened)
|
||||||
|
if tel.MeterProvider != nil {
|
||||||
|
_ = tel.MeterProvider.ForceFlush(ctx)
|
||||||
|
}
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
resp, err := http.Get(ts.URL)
|
var body string
|
||||||
if err != nil { t.Fatalf("GET metrics failed: %v", err) }
|
for i := 0; i < 5; i++ {
|
||||||
defer resp.Body.Close()
|
resp, err := http.Get(ts.URL)
|
||||||
b, _ := io.ReadAll(resp.Body)
|
if err != nil {
|
||||||
body := string(b)
|
t.Fatalf("GET metrics failed: %v", err)
|
||||||
|
}
|
||||||
|
b, _ := io.ReadAll(resp.Body)
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
body = string(b)
|
||||||
|
if strings.Contains(body, "newt_connection_attempts_total") {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
f, err := os.Open("internal/telemetry/testdata/expected_contains.golden")
|
f, err := os.Open(filepath.Join("testdata", "expected_contains.golden"))
|
||||||
if err != nil { t.Fatalf("read golden: %v", err) }
|
if err != nil {
|
||||||
|
t.Fatalf("read golden: %v", err)
|
||||||
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
s := bufio.NewScanner(f)
|
s := bufio.NewScanner(f)
|
||||||
for s.Scan() {
|
for s.Scan() {
|
||||||
needle := strings.TrimSpace(s.Text())
|
needle := strings.TrimSpace(s.Text())
|
||||||
if needle == "" { continue }
|
if needle == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if !strings.Contains(body, needle) {
|
if !strings.Contains(body, needle) {
|
||||||
t.Fatalf("expected metrics body to contain %q. body=\n%s", needle, body)
|
t.Fatalf("expected metrics body to contain %q. body=\n%s", needle, body)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := s.Err(); err != nil { t.Fatalf("scan golden: %v", err) }
|
if err := s.Err(); err != nil {
|
||||||
|
t.Fatalf("scan golden: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -13,13 +13,15 @@ import (
|
|||||||
// Smoke test that /metrics contains at least one newt_* metric when Prom exporter is enabled.
|
// Smoke test that /metrics contains at least one newt_* metric when Prom exporter is enabled.
|
||||||
func TestMetricsSmoke(t *testing.T) {
|
func TestMetricsSmoke(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
resetMetricsForTest()
|
||||||
|
t.Setenv("NEWT_METRICS_INCLUDE_SITE_LABELS", "true")
|
||||||
cfg := Config{
|
cfg := Config{
|
||||||
ServiceName: "newt",
|
ServiceName: "newt",
|
||||||
PromEnabled: true,
|
PromEnabled: true,
|
||||||
OTLPEnabled: false,
|
OTLPEnabled: false,
|
||||||
AdminAddr: "127.0.0.1:0",
|
AdminAddr: "127.0.0.1:0",
|
||||||
BuildVersion: "test",
|
BuildVersion: "test",
|
||||||
BuildCommit: "deadbeef",
|
BuildCommit: "deadbeef",
|
||||||
MetricExportInterval: 5 * time.Second,
|
MetricExportInterval: 5 * time.Second,
|
||||||
}
|
}
|
||||||
tel, err := Init(ctx, cfg)
|
tel, err := Init(ctx, cfg)
|
||||||
@@ -37,18 +39,27 @@ func TestMetricsSmoke(t *testing.T) {
|
|||||||
|
|
||||||
// Record a simple metric and then fetch /metrics
|
// Record a simple metric and then fetch /metrics
|
||||||
IncConnAttempt(ctx, "websocket", "success")
|
IncConnAttempt(ctx, "websocket", "success")
|
||||||
|
if tel.MeterProvider != nil {
|
||||||
|
_ = tel.MeterProvider.ForceFlush(ctx)
|
||||||
|
}
|
||||||
// Give the exporter a tick to collect
|
// Give the exporter a tick to collect
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
resp, err := http.Get(ts.URL)
|
var body string
|
||||||
if err != nil {
|
for i := 0; i < 5; i++ {
|
||||||
t.Fatalf("GET /metrics failed: %v", err)
|
resp, err := http.Get(ts.URL)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GET /metrics failed: %v", err)
|
||||||
|
}
|
||||||
|
b, _ := io.ReadAll(resp.Body)
|
||||||
|
_ = resp.Body.Close()
|
||||||
|
body = string(b)
|
||||||
|
if strings.Contains(body, "newt_connection_attempts_total") {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
|
||||||
b, _ := io.ReadAll(resp.Body)
|
|
||||||
body := string(b)
|
|
||||||
if !strings.Contains(body, "newt_connection_attempts_total") {
|
if !strings.Contains(body, "newt_connection_attempts_total") {
|
||||||
t.Fatalf("expected newt_connection_attempts_total in metrics, got:\n%s", body)
|
t.Fatalf("expected newt_connection_attempts_total in metrics, got:\n%s", body)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,3 +1,7 @@
|
|||||||
newt_connection_attempts_total
|
newt_connection_attempts_total
|
||||||
|
newt_websocket_connected
|
||||||
|
newt_websocket_reconnects_total
|
||||||
|
newt_proxy_connections_total
|
||||||
newt_build_info
|
newt_build_info
|
||||||
|
|
||||||
|
process_start_time_seconds
|
||||||
|
|||||||
@@ -509,6 +509,7 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
|
|||||||
|
|
||||||
tunnelID := pm.currentTunnelID
|
tunnelID := pm.currentTunnelID
|
||||||
telemetry.IncProxyAccept(context.Background(), tunnelID, "tcp", "success", "")
|
telemetry.IncProxyAccept(context.Background(), tunnelID, "tcp", "success", "")
|
||||||
|
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "tcp", telemetry.ProxyConnectionOpened)
|
||||||
if tunnelID != "" {
|
if tunnelID != "" {
|
||||||
state.Global().IncSessions(tunnelID)
|
state.Global().IncSessions(tunnelID)
|
||||||
if e := pm.getEntry(tunnelID); e != nil {
|
if e := pm.getEntry(tunnelID); e != nil {
|
||||||
@@ -523,6 +524,7 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
|
|||||||
logger.Error("Error connecting to target: %v", err)
|
logger.Error("Error connecting to target: %v", err)
|
||||||
accepted.Close()
|
accepted.Close()
|
||||||
telemetry.IncProxyAccept(context.Background(), tunnelID, "tcp", "failure", classifyProxyError(err))
|
telemetry.IncProxyAccept(context.Background(), tunnelID, "tcp", "failure", classifyProxyError(err))
|
||||||
|
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "tcp", telemetry.ProxyConnectionClosed)
|
||||||
telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "tcp", "failure", time.Since(connStart).Seconds())
|
telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "tcp", "failure", time.Since(connStart).Seconds())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -556,6 +558,7 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "tcp", "success", time.Since(connStart).Seconds())
|
telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "tcp", "success", time.Since(connStart).Seconds())
|
||||||
|
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "tcp", telemetry.ProxyConnectionClosed)
|
||||||
}(tunnelID, conn)
|
}(tunnelID, conn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -631,6 +634,7 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
|
|||||||
}
|
}
|
||||||
tunnelID := pm.currentTunnelID
|
tunnelID := pm.currentTunnelID
|
||||||
telemetry.IncProxyAccept(context.Background(), tunnelID, "udp", "success", "")
|
telemetry.IncProxyAccept(context.Background(), tunnelID, "udp", "success", "")
|
||||||
|
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionOpened)
|
||||||
// Only increment activeUDP after a successful DialUDP
|
// Only increment activeUDP after a successful DialUDP
|
||||||
if e := pm.getEntry(tunnelID); e != nil {
|
if e := pm.getEntry(tunnelID); e != nil {
|
||||||
e.activeUDP.Add(1)
|
e.activeUDP.Add(1)
|
||||||
@@ -655,6 +659,7 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
|
|||||||
}
|
}
|
||||||
clientsMutex.Unlock()
|
clientsMutex.Unlock()
|
||||||
telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "udp", result, time.Since(start).Seconds())
|
telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "udp", result, time.Since(start).Seconds())
|
||||||
|
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionClosed)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
buffer := make([]byte, 65507)
|
buffer := make([]byte, 65507)
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ probe "newt_* presence" "^newt_" || true
|
|||||||
|
|
||||||
# Site gauges with site_id
|
# Site gauges with site_id
|
||||||
probe "site_online with site_id" "^newt_site_online\{.*site_id=\"[^\"]+\"" || true
|
probe "site_online with site_id" "^newt_site_online\{.*site_id=\"[^\"]+\"" || true
|
||||||
probe "last_heartbeat with site_id" "^newt_site_last_heartbeat_seconds\{.*site_id=\"[^\"]+\"" || true
|
probe "last_heartbeat with site_id" "^newt_site_last_heartbeat_timestamp_seconds\{.*site_id=\"[^\"]+\"" || true
|
||||||
|
|
||||||
# Bytes with direction ingress/egress and protocol
|
# Bytes with direction ingress/egress and protocol
|
||||||
probe "tunnel bytes ingress" "^newt_tunnel_bytes_total\{.*direction=\"ingress\".*protocol=\"(tcp|udp)\"" || true
|
probe "tunnel bytes ingress" "^newt_tunnel_bytes_total\{.*direction=\"ingress\".*protocol=\"(tcp|udp)\"" || true
|
||||||
@@ -39,11 +39,14 @@ fi
|
|||||||
# WebSocket metrics (when OTLP/WS used)
|
# WebSocket metrics (when OTLP/WS used)
|
||||||
probe "websocket connect latency buckets" "^newt_websocket_connect_latency_seconds_bucket" || true
|
probe "websocket connect latency buckets" "^newt_websocket_connect_latency_seconds_bucket" || true
|
||||||
probe "websocket messages total" "^newt_websocket_messages_total\{.*(direction|msg_type)=" || true
|
probe "websocket messages total" "^newt_websocket_messages_total\{.*(direction|msg_type)=" || true
|
||||||
|
probe "websocket connected gauge" "^newt_websocket_connected" || true
|
||||||
|
probe "websocket reconnects total" "^newt_websocket_reconnects_total\{" || true
|
||||||
|
|
||||||
# Proxy metrics (when proxy active)
|
# Proxy metrics (when proxy active)
|
||||||
probe "proxy active connections" "^newt_proxy_active_connections\{" || true
|
probe "proxy active connections" "^newt_proxy_active_connections\{" || true
|
||||||
probe "proxy buffer bytes" "^newt_proxy_buffer_bytes\{" || true
|
probe "proxy buffer bytes" "^newt_proxy_buffer_bytes\{" || true
|
||||||
probe "proxy drops total" "^newt_proxy_drops_total\{" || true
|
probe "proxy drops total" "^newt_proxy_drops_total\{" || true
|
||||||
|
probe "proxy connections total" "^newt_proxy_connections_total\{" || true
|
||||||
|
|
||||||
# Config apply
|
# Config apply
|
||||||
probe "config apply seconds buckets" "^newt_config_apply_seconds_bucket\{" || true
|
probe "config apply seconds buckets" "^newt_config_apply_seconds_bucket\{" || true
|
||||||
|
|||||||
@@ -167,6 +167,7 @@ func (c *Client) Close() error {
|
|||||||
|
|
||||||
// Set connection status to false
|
// Set connection status to false
|
||||||
c.setConnected(false)
|
c.setConnected(false)
|
||||||
|
telemetry.SetWSConnectionState(false)
|
||||||
|
|
||||||
// Close the WebSocket connection gracefully
|
// Close the WebSocket connection gracefully
|
||||||
if c.conn != nil {
|
if c.conn != nil {
|
||||||
@@ -508,6 +509,7 @@ func (c *Client) establishConnection() error {
|
|||||||
} else {
|
} else {
|
||||||
telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonError)
|
telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonError)
|
||||||
}
|
}
|
||||||
|
telemetry.IncWSReconnect(ctx, etype)
|
||||||
return fmt.Errorf("failed to connect to WebSocket: %w", err)
|
return fmt.Errorf("failed to connect to WebSocket: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -515,6 +517,7 @@ func (c *Client) establishConnection() error {
|
|||||||
telemetry.ObserveWSConnectLatency(ctx, lat, "success", "")
|
telemetry.ObserveWSConnectLatency(ctx, lat, "success", "")
|
||||||
c.conn = conn
|
c.conn = conn
|
||||||
c.setConnected(true)
|
c.setConnected(true)
|
||||||
|
telemetry.SetWSConnectionState(true)
|
||||||
c.setMetricsContext(ctx)
|
c.setMetricsContext(ctx)
|
||||||
sessionStart := time.Now()
|
sessionStart := time.Now()
|
||||||
// Wire up pong handler for metrics
|
// Wire up pong handler for metrics
|
||||||
@@ -632,6 +635,7 @@ func (c *Client) pingMonitor() {
|
|||||||
default:
|
default:
|
||||||
logger.Error("Ping failed: %v", err)
|
logger.Error("Ping failed: %v", err)
|
||||||
telemetry.IncWSKeepaliveFailure(c.metricsContext(), "ping_write")
|
telemetry.IncWSKeepaliveFailure(c.metricsContext(), "ping_write")
|
||||||
|
telemetry.IncWSReconnect(c.metricsContext(), "ping_write")
|
||||||
c.reconnect()
|
c.reconnect()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -660,6 +664,7 @@ func (c *Client) readPumpWithDisconnectDetection(started time.Time) {
|
|||||||
// Shutting down, don't reconnect
|
// Shutting down, don't reconnect
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
|
telemetry.IncWSReconnect(ctx, disconnectReason)
|
||||||
c.reconnect()
|
c.reconnect()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@@ -710,6 +715,7 @@ func (c *Client) readPumpWithDisconnectDetection(started time.Time) {
|
|||||||
|
|
||||||
func (c *Client) reconnect() {
|
func (c *Client) reconnect() {
|
||||||
c.setConnected(false)
|
c.setConnected(false)
|
||||||
|
telemetry.SetWSConnectionState(false)
|
||||||
if c.conn != nil {
|
if c.conn != nil {
|
||||||
c.conn.Close()
|
c.conn.Close()
|
||||||
c.conn = nil
|
c.conn = nil
|
||||||
|
|||||||
Reference in New Issue
Block a user