Merge pull request #11 from marcschaeferger/codex/implement-review-suggestions-for-newt-code

Adjust telemetry metrics for heartbeat timestamps and uptime
This commit is contained in:
Marc Schäfer
2025-10-10 19:18:49 +02:00
committed by GitHub
14 changed files with 323 additions and 96 deletions

View File

@@ -6,20 +6,20 @@ This document captures the current state of Newt metrics, prioritized fixes, and
- Export: Prometheus exposition (default), optional OTLP (gRPC)
- Existing instruments:
- Sites: newt_site_registrations_total, newt_site_online (0/1), newt_site_last_heartbeat_seconds
- Sites: newt_site_registrations_total, newt_site_online (0/1), newt_site_last_heartbeat_timestamp_seconds
- Tunnel/Traffic: newt_tunnel_sessions, newt_tunnel_bytes_total, newt_tunnel_latency_seconds, newt_tunnel_reconnects_total
- Connection lifecycle: newt_connection_attempts_total, newt_connection_errors_total
- Operations: newt_config_reloads_total, newt_restart_count_total, newt_build_info
- Operations: newt_config_reloads_total, newt_restart_count_total, newt_config_apply_seconds, newt_cert_rotation_total
- Operations: newt_config_reloads_total, process_start_time_seconds, newt_build_info
- Operations: newt_config_reloads_total, process_start_time_seconds, newt_config_apply_seconds, newt_cert_rotation_total
- Build metadata: newt_build_info
- Control plane: newt_websocket_connect_latency_seconds, newt_websocket_messages_total
- Proxy: newt_proxy_active_connections, newt_proxy_buffer_bytes, newt_proxy_async_backlog_bytes, newt_proxy_drops_total
- Control plane: newt_websocket_connect_latency_seconds, newt_websocket_messages_total, newt_websocket_connected, newt_websocket_reconnects_total
- Proxy: newt_proxy_active_connections, newt_proxy_buffer_bytes, newt_proxy_async_backlog_bytes, newt_proxy_drops_total, newt_proxy_accept_total, newt_proxy_connection_duration_seconds, newt_proxy_connections_total
- Go runtime: GC, heap, goroutines via runtime instrumentation
2) Main issues addressed now
- Attribute filter (allow-list) extended to include site_id and region in addition to existing keys (tunnel_id, transport, protocol, direction, result, reason, error_type, version, commit).
- site_id and region propagation: site_id is now attached as a metric label across newt_*; region is added as a metric label when set. Both remain resource attributes for consistency with OTEL.
- site_id and region propagation: site_id/region remain resource attributes. Metric labels mirror them on per-site gauges and counters by default; set `NEWT_METRICS_INCLUDE_SITE_LABELS=false` to drop them for multi-tenant scrapes.
- Label semantics clarified:
- transport: control-plane mechanism (e.g., websocket, wireguard)
- protocol: L4 payload type (tcp, udp)
@@ -29,10 +29,9 @@ This document captures the current state of Newt metrics, prioritized fixes, and
3) Remaining gaps and deviations
- Some call sites still need initiator label on reconnect outcomes (client vs server). This is planned.
- WebSocket and Proxy metrics (connect latency, messages, active connections, buffer/drops, async backlog) are planned additions.
- Config apply duration and cert rotation counters are planned.
- Registration and config reload failures are not yet emitted; add failure code paths so result labels expose churn.
- Restart counter increments only when build metadata is provided; consider decoupling to count all boots.
- Document using `process_start_time_seconds` (and `time()` in PromQL) to derive uptime; no explicit restart counter is needed.
- Metric helpers often use `context.Background()`. Where lightweight contexts exist (e.g., HTTP handlers), propagate them to ease future correlation.
- Tracing coverage is limited to admin HTTP and WebSocket connect spans; extend to blueprint fetches, proxy accept loops, and WireGuard updates when OTLP is enabled.
@@ -44,8 +43,6 @@ This document captures the current state of Newt metrics, prioritized fixes, and
- Correct label semantics (transport vs protocol); fix sessions transport labelling
- Documentation alignment
- Phase 2 (next)
- WebSocket: newt_websocket_connect_latency_seconds; newt_websocket_messages_total{direction,msg_type}
- Proxy: newt_proxy_active_connections, newt_proxy_buffer_bytes, newt_proxy_drops_total, newt_proxy_async_backlog_bytes
- Reconnect: add initiator label (client/server)
- Config & PKI: newt_config_apply_seconds{phase,result}; newt_cert_rotation_total{result}
- WebSocket disconnect and keepalive failure counters
@@ -66,7 +63,7 @@ This document captures the current state of Newt metrics, prioritized fixes, and
- Sustained connection errors:
- rate(newt_connection_errors_total[5m]) by (site_id,transport,error_type)
- Heartbeat gaps:
- max_over_time(newt_site_last_heartbeat_seconds[15m]) by (site_id)
- max_over_time(time() - newt_site_last_heartbeat_timestamp_seconds[15m]) by (site_id)
- Proxy drops:
- increase(newt_proxy_drops_total[5m]) by (site_id,protocol)
- WebSocket connect p95 (when added):

View File

@@ -27,6 +27,7 @@ Enable exporters via environment variables (no code changes required)
- OTEL_RESOURCE_ATTRIBUTES=service.instance.id=<id>,site_id=<id>
- OTEL_METRIC_EXPORT_INTERVAL=15s (default)
- NEWT_ADMIN_ADDR=127.0.0.1:2112 (default admin HTTP with /metrics)
- NEWT_METRICS_INCLUDE_SITE_LABELS=true|false (default: true; disable to drop site_id/region as metric labels and rely on resource attributes only)
Runtime behavior
@@ -36,12 +37,14 @@ Runtime behavior
Metric catalog (current)
Unless otherwise noted, `site_id` and `region` are available via resource attributes and, by default, as metric labels. Set `NEWT_METRICS_INCLUDE_SITE_LABELS=false` to drop them from counter/histogram label sets in high-cardinality environments.
| Metric | Instrument | Key attributes | Purpose | Example |
| --- | --- | --- | --- | --- |
| `newt_build_info` | Observable gauge (Int64) | `version`, `commit`, `site_id`, `region` (optional) | Emits build metadata with value `1` for scrape-time verification. | `newt_build_info{version="1.5.0",site_id="acme-edge-1"} 1` |
| `newt_build_info` | Observable gauge (Int64) | `version`, `commit`, `site_id`, `region` (optional when site labels enabled) | Emits build metadata with value `1` for scrape-time verification. | `newt_build_info{version="1.5.0"} 1` |
| `newt_site_registrations_total` | Counter (Int64) | `result` (`success`/`failure`), `site_id`, `region` (optional) | Counts Pangolin registration attempts. | `newt_site_registrations_total{result="success",site_id="acme-edge-1"} 1` |
| `newt_site_online` | Observable gauge (Int64) | `site_id` | Reports whether the site is currently connected (`1`) or offline (`0`). | `newt_site_online{site_id="acme-edge-1"} 1` |
| `newt_site_last_heartbeat_seconds` | Observable gauge (Float64) | `site_id` | Time since the most recent Pangolin heartbeat. | `newt_site_last_heartbeat_seconds{site_id="acme-edge-1"} 2.4` |
| `newt_site_last_heartbeat_timestamp_seconds` | Observable gauge (Float64) | `site_id` | Unix timestamp of the most recent Pangolin heartbeat (derive age via `time() - metric`). | `newt_site_last_heartbeat_timestamp_seconds{site_id="acme-edge-1"} 1.728e+09` |
| `newt_tunnel_sessions` | Observable gauge (Int64) | `site_id`, `tunnel_id` (when enabled) | Counts active tunnel sessions per peer; collapses to per-site when tunnel IDs are disabled. | `newt_tunnel_sessions{site_id="acme-edge-1",tunnel_id="wgpub..."} 3` |
| `newt_tunnel_bytes_total` | Counter (Int64) | `direction` (`ingress`/`egress`), `protocol` (`tcp`/`udp`), `tunnel_id` (optional), `site_id`, `region` (optional) | Measures proxied traffic volume across tunnels. | `newt_tunnel_bytes_total{direction="ingress",protocol="tcp",site_id="acme-edge-1"} 4096` |
| `newt_tunnel_latency_seconds` | Histogram (Float64) | `transport` (e.g., `wireguard`), `tunnel_id` (optional), `site_id`, `region` (optional) | Captures RTT or configuration-driven latency samples. | `newt_tunnel_latency_seconds_bucket{transport="wireguard",le="0.5"} 42` |
@@ -49,15 +52,18 @@ Metric catalog (current)
| `newt_connection_attempts_total` | Counter (Int64) | `transport` (`auth`/`websocket`), `result`, `site_id`, `region` (optional) | Measures control-plane dial attempts and their outcomes. | `newt_connection_attempts_total{transport="websocket",result="success",site_id="acme-edge-1"} 8` |
| `newt_connection_errors_total` | Counter (Int64) | `transport`, `error_type`, `site_id`, `region` (optional) | Buckets connection failures by normalized error class. | `newt_connection_errors_total{transport="websocket",error_type="tls_handshake",site_id="acme-edge-1"} 1` |
| `newt_config_reloads_total` | Counter (Int64) | `result`, `site_id`, `region` (optional) | Counts remote blueprint/config reloads. | `newt_config_reloads_total{result="success",site_id="acme-edge-1"} 3` |
| `newt_restart_count_total` | Counter (Int64) | `site_id`, `region` (optional) | Increments once per process boot to detect restarts. | `newt_restart_count_total{site_id="acme-edge-1"} 1` |
| `process_start_time_seconds` | Observable gauge (Float64) | — | Unix timestamp of the Newt process start time (use `time() - process_start_time_seconds` for uptime). | `process_start_time_seconds 1.728e+09` |
| `newt_config_apply_seconds` | Histogram (Float64) | `phase` (`interface`/`peer`), `result`, `site_id`, `region` (optional) | Measures time spent applying WireGuard configuration phases. | `newt_config_apply_seconds_sum{phase="peer",result="success",site_id="acme-edge-1"} 0.48` |
| `newt_cert_rotation_total` | Counter (Int64) | `result`, `site_id`, `region` (optional) | Tracks client certificate rotation attempts. | `newt_cert_rotation_total{result="success",site_id="acme-edge-1"} 2` |
| `newt_websocket_connect_latency_seconds` | Histogram (Float64) | `transport="websocket"`, `result`, `error_type` (on failure), `site_id`, `region` (optional) | Measures WebSocket dial latency and exposes failure buckets. | `newt_websocket_connect_latency_seconds_bucket{result="success",le="0.5",site_id="acme-edge-1"} 9` |
| `newt_websocket_messages_total` | Counter (Int64) | `direction` (`in`/`out`), `msg_type` (`text`/`ping`/`pong`), `site_id`, `region` (optional) | Accounts for control WebSocket traffic volume by type. | `newt_websocket_messages_total{direction="out",msg_type="ping",site_id="acme-edge-1"} 12` |
| `newt_websocket_connected` | Observable gauge (Int64) | `site_id`, `region` (optional) | Reports current WebSocket connectivity (`1` when connected). | `newt_websocket_connected{site_id="acme-edge-1"} 1` |
| `newt_websocket_reconnects_total` | Counter (Int64) | `reason` (`tls_handshake`, `dial_timeout`, `io_error`, `ping_write`, `timeout`, etc.), `site_id`, `region` (optional) | Counts reconnect attempts with normalized reasons for failure analysis. | `newt_websocket_reconnects_total{reason="timeout",site_id="acme-edge-1"} 3` |
| `newt_proxy_active_connections` | Observable gauge (Int64) | `protocol` (`tcp`/`udp`), `direction` (`ingress`/`egress`), `tunnel_id` (optional), `site_id`, `region` (optional) | Current proxy connections per tunnel and protocol. | `newt_proxy_active_connections{protocol="tcp",direction="egress",site_id="acme-edge-1"} 4` |
| `newt_proxy_buffer_bytes` | Observable gauge (Int64) | `protocol`, `direction`, `tunnel_id` (optional), `site_id`, `region` (optional) | Volume of buffered data awaiting flush in proxy queues. | `newt_proxy_buffer_bytes{protocol="udp",direction="egress",site_id="acme-edge-1"} 2048` |
| `newt_proxy_async_backlog_bytes` | Observable gauge (Int64) | `protocol`, `direction`, `tunnel_id` (optional), `site_id`, `region` (optional) | Tracks async write backlog when deferred flushing is enabled. | `newt_proxy_async_backlog_bytes{protocol="tcp",direction="egress",site_id="acme-edge-1"} 512` |
| `newt_proxy_drops_total` | Counter (Int64) | `protocol`, `tunnel_id` (optional), `site_id`, `region` (optional) | Counts proxy drop events caused by downstream write errors. | `newt_proxy_drops_total{protocol="udp",site_id="acme-edge-1"} 1` |
| `newt_proxy_connections_total` | Counter (Int64) | `event` (`opened`/`closed`), `protocol`, `tunnel_id` (optional), `site_id`, `region` (optional) | Tracks proxy connection lifecycle events for rate/SLO calculations. | `newt_proxy_connections_total{event="opened",protocol="tcp",site_id="acme-edge-1"} 10` |
Conventions
@@ -174,7 +180,7 @@ sum(newt_tunnel_sessions)
Compatibility notes
- Gauges do not use the _total suffix (e.g., newt_tunnel_sessions).
- site_id is emitted as both resource attribute and metric label on all newt_* series; region is included as a metric label only when set. tunnel_id is a metric label (WireGuard public key). Never expose secrets in labels.
- site_id/region remain resource attributes. Metric labels for these fields appear on per-site gauges (e.g., `newt_site_online`) and, by default, on counters/histograms; disable them with `NEWT_METRICS_INCLUDE_SITE_LABELS=false` if needed. `tunnel_id` is a metric label (WireGuard public key). Never expose secrets in labels.
- NEWT_METRICS_INCLUDE_TUNNEL_ID (default: true) toggles whether tunnel_id is included as a label on bytes/sessions/proxy/reconnect metrics. Disable in high-cardinality environments.
- Avoid double-scraping: scrape either Newt (/metrics) or the Collector's Prometheus exporter, not both.
- Prometheus does not accept remote_write; use Mimir/Cortex/VM/Thanos-Receive for remote_write.

View File

@@ -16,7 +16,7 @@ A global attribute filter (see `buildMeterProvider`) constrains exposed label ke
exports stay bounded.
- **Site lifecycle**: `newt_site_registrations_total`, `newt_site_online`, and
`newt_site_last_heartbeat_seconds` capture registration attempts and liveness. They
`newt_site_last_heartbeat_timestamp_seconds` capture registration attempts and liveness. They
are fed either manually (`IncSiteRegistration`) or via the `TelemetryView` state
callback that publishes observable gauges for the active site.
- **Tunnel health and usage**: Counters and histograms track bytes, latency, reconnects,
@@ -27,17 +27,20 @@ exports stay bounded.
`newt_connection_errors_total` are emitted throughout the WebSocket client to classify
authentication, dial, and transport failures.
- **Operations/configuration**: `newt_config_reloads_total`,
`newt_restart_count_total`, `newt_config_apply_seconds`, and
`process_start_time_seconds`, `newt_config_apply_seconds`, and
`newt_cert_rotation_total` provide visibility into blueprint reloads, process boots,
configuration timings, and certificate rotation outcomes.
- **Build metadata**: `newt_build_info` records the binary version/commit together
with a monotonic restart counter when build information is supplied at startup.
- **WebSocket control-plane**: `newt_websocket_connect_latency_seconds` and
`newt_websocket_messages_total` report connect latency and ping/pong/text activity.
with optional site metadata when build information is supplied at startup.
- **WebSocket control-plane**: `newt_websocket_connect_latency_seconds`,
`newt_websocket_messages_total`, `newt_websocket_connected`, and
`newt_websocket_reconnects_total` report connect latency, ping/pong/text activity,
connection state, and reconnect reasons.
- **Proxy data-plane**: Observable gauges (`newt_proxy_active_connections`,
`newt_proxy_buffer_bytes`, `newt_proxy_async_backlog_bytes`) and the
`newt_proxy_drops_total` counter are fed from the proxy manager to monitor backlog
and drop behaviour alongside per-protocol byte counters.
`newt_proxy_buffer_bytes`, `newt_proxy_async_backlog_bytes`) plus counters for
drops, accepts, connection lifecycle events (`newt_proxy_connections_total`), and
duration histograms (`newt_proxy_connection_duration_seconds`) surface backlog,
drop behaviour, and churn alongside per-protocol byte counters.
Refer to `docs/observability.md` for a tabular catalogue with instrument types,
attributes, and sample exposition lines.
@@ -61,8 +64,9 @@ The implementation adheres to most OTel Go recommendations:
suffixes for counters and `_seconds`/`_bytes` unit conventions. Histograms are
registered with explicit second-based buckets.
- **Resource attributes** Service name/version and optional `site_id`/`region`
populate the `resource.Resource` and are also injected as metric attributes for
compatibility with Prometheus queries.
populate the `resource.Resource`. Metric labels mirror these by default (and on
per-site gauges) but can be disabled with `NEWT_METRICS_INCLUDE_SITE_LABELS=false`
to avoid unnecessary cardinality growth.
- **Attribute hygiene** A single attribute filter (`sdkmetric.WithView`) enforces
the allow-list of label keys to prevent accidental high-cardinality emission.
- **Runtime metrics** Go runtime instrumentation is enabled automatically through
@@ -83,10 +87,9 @@ The review identified a few actionable adjustments:
2. **Surface config reload failures** `telemetry.IncConfigReload` is invoked with
`result="success"` only. Callers should record a failure result when blueprint
parsing or application aborts before success counters are incremented.
3. **Harmonise restart count behaviour** `newt_restart_count_total` increments only
when build metadata is provided. Consider moving the increment out of
`RegisterBuildInfo` so the counter advances even for ad-hoc builds without version
strings.
3. **Expose robust uptime** Document using `time() - process_start_time_seconds`
to derive uptime now that the restart counter has been replaced with a timestamp
gauge.
4. **Propagate contexts where available** Many emitters call metric helpers with
`context.Background()`. Passing real contexts (when inexpensive) would allow future
exporters to correlate spans and metrics.
@@ -98,17 +101,17 @@ The review identified a few actionable adjustments:
Prioritised additions that would close visibility gaps:
1. **WebSocket disconnect outcomes** A counter (e.g., `newt_websocket_disconnects_total`)
partitioned by `reason` would complement the existing connect latency histogram and
explain reconnect storms.
2. **Keepalive/heartbeat failures** Counting ping timeouts or heartbeat misses would
make `newt_site_last_heartbeat_seconds` actionable by providing discrete events.
3. **Proxy connection lifecycle** Add counters/histograms for proxy accept events and
connection durations to correlate drops with load and backlog metrics.
1. **Config reload error taxonomy** Split reload attempts into a dedicated
`newt_config_reload_errors_total{phase}` counter to make blueprint validation failures
visible alongside the existing success counter.
2. **Config source visibility** Export `newt_config_source_info{source,version}` so
operators can audit the active blueprint origin/commit during incidents.
3. **Certificate expiry** Emit `newt_cert_expiry_timestamp_seconds` (per cert) to
enable proactive alerts before mTLS credentials lapse.
4. **Blueprint/config pull latency** Measuring Pangolin blueprint fetch durations and
HTTP status distribution would expose slow control-plane operations.
5. **Certificate rotation attempts** Complement `newt_cert_rotation_total` with a
duration histogram to observe slow PKI updates and detect stuck rotations.
5. **Tunnel setup latency** Histograms for DNS resolution and tunnel handshakes would
help correlate connect latency spikes with network dependencies.
These metrics rely on data that is already available in the code paths mentioned
above and would round out operational dashboards.

View File

@@ -3,6 +3,8 @@ package telemetry
import (
"context"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
@@ -37,9 +39,9 @@ var (
// Config/Restart
mConfigReloads metric.Int64Counter
mRestartCount metric.Int64Counter
mConfigApply metric.Float64Histogram
mCertRotationTotal metric.Int64Counter
mProcessStartTime metric.Float64ObservableGauge
// Build info
mBuildInfo metric.Int64ObservableGauge
@@ -50,6 +52,8 @@ var (
mWSDisconnects metric.Int64Counter
mWSKeepaliveFailure metric.Int64Counter
mWSSessionDuration metric.Float64Histogram
mWSConnected metric.Int64ObservableGauge
mWSReconnects metric.Int64Counter
// Proxy
mProxyActiveConns metric.Int64ObservableGauge
@@ -58,16 +62,28 @@ var (
mProxyDropsTotal metric.Int64Counter
mProxyAcceptsTotal metric.Int64Counter
mProxyConnDuration metric.Float64Histogram
mProxyConnectionsTotal metric.Int64Counter
buildVersion string
buildCommit string
processStartUnix = float64(time.Now().UnixNano()) / 1e9
wsConnectedState atomic.Int64
)
// attrsWithSite appends global site/region labels when present.
// Proxy connection lifecycle events.
const (
ProxyConnectionOpened = "opened"
ProxyConnectionClosed = "closed"
)
// attrsWithSite appends site/region labels only when explicitly enabled to keep
// label cardinality low by default.
func attrsWithSite(extra ...attribute.KeyValue) []attribute.KeyValue {
attrs := make([]attribute.KeyValue, 0, len(extra)+2)
attrs = append(attrs, extra...)
attrs := make([]attribute.KeyValue, len(extra))
copy(attrs, extra)
if ShouldIncludeSiteLabels() {
attrs = append(attrs, siteAttrs()...)
}
return attrs
}
@@ -111,8 +127,9 @@ func registerSiteInstruments() error {
if err != nil {
return err
}
mSiteLastHeartbeat, err = meter.Float64ObservableGauge("newt_site_last_heartbeat_seconds",
metric.WithDescription("Seconds since last site heartbeat"))
mSiteLastHeartbeat, err = meter.Float64ObservableGauge("newt_site_last_heartbeat_timestamp_seconds",
metric.WithDescription("Unix timestamp of the last site heartbeat"),
metric.WithUnit("s"))
if err != nil {
return err
}
@@ -164,13 +181,22 @@ func registerConnInstruments() error {
func registerConfigInstruments() error {
mConfigReloads, _ = meter.Int64Counter("newt_config_reloads_total",
metric.WithDescription("Configuration reloads"))
mRestartCount, _ = meter.Int64Counter("newt_restart_count_total",
metric.WithDescription("Process restart count (incremented on start)"))
mConfigApply, _ = meter.Float64Histogram("newt_config_apply_seconds",
metric.WithDescription("Configuration apply duration in seconds"),
metric.WithUnit("s"))
mCertRotationTotal, _ = meter.Int64Counter("newt_cert_rotation_total",
metric.WithDescription("Certificate rotation events (success/failure)"))
mProcessStartTime, _ = meter.Float64ObservableGauge("process_start_time_seconds",
metric.WithDescription("Unix timestamp of the process start time"),
metric.WithUnit("s"))
if mProcessStartTime != nil {
if _, err := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
o.ObserveFloat64(mProcessStartTime, processStartUnix)
return nil
}, mProcessStartTime); err != nil {
otel.Handle(err)
}
}
return nil
}
@@ -191,6 +217,10 @@ func registerBuildWSProxyInstruments() error {
mWSSessionDuration, _ = meter.Float64Histogram("newt_websocket_session_duration_seconds",
metric.WithDescription("Duration of established WebSocket sessions"),
metric.WithUnit("s"))
mWSConnected, _ = meter.Int64ObservableGauge("newt_websocket_connected",
metric.WithDescription("WebSocket connection state (1=connected, 0=disconnected)"))
mWSReconnects, _ = meter.Int64Counter("newt_websocket_reconnects_total",
metric.WithDescription("WebSocket reconnect attempts by reason"))
// Proxy
mProxyActiveConns, _ = meter.Int64ObservableGauge("newt_proxy_active_connections",
metric.WithDescription("Proxy active connections per tunnel and protocol"))
@@ -207,6 +237,8 @@ func registerBuildWSProxyInstruments() error {
mProxyConnDuration, _ = meter.Float64Histogram("newt_proxy_connection_duration_seconds",
metric.WithDescription("Duration of completed proxy connections"),
metric.WithUnit("s"))
mProxyConnectionsTotal, _ = meter.Int64Counter("newt_proxy_connections_total",
metric.WithDescription("Proxy connection lifecycle events by protocol"))
// Register a default callback for build info if version/commit set
reg, e := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
if buildVersion == "" && buildCommit == "" {
@@ -219,7 +251,9 @@ func registerBuildWSProxyInstruments() error {
if buildCommit != "" {
attrs = append(attrs, attribute.String("commit", buildCommit))
}
if ShouldIncludeSiteLabels() {
attrs = append(attrs, siteAttrs()...)
}
o.ObserveInt64(mBuildInfo, 1, metric.WithAttributes(attrs...))
return nil
}, mBuildInfo)
@@ -229,6 +263,17 @@ func registerBuildWSProxyInstruments() error {
// Provide a functional stopper that unregisters the callback
obsStopper = func() { _ = reg.Unregister() }
}
if mWSConnected != nil {
if regConn, err := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
val := wsConnectedState.Load()
o.ObserveInt64(mWSConnected, val, metric.WithAttributes(attrsWithSite()...))
return nil
}, mWSConnected); err != nil {
otel.Handle(err)
} else {
wsConnStopper = func() { _ = regConn.Unregister() }
}
}
return nil
}
@@ -241,6 +286,7 @@ var (
obsStopper func()
proxyObsOnce sync.Once
proxyStopper func()
wsConnStopper func()
)
// SetObservableCallback registers a single callback that will be invoked
@@ -251,7 +297,7 @@ var (
//
// telemetry.SetObservableCallback(func(ctx context.Context, o metric.Observer) error {
// o.ObserveInt64(mSiteOnline, 1)
// o.ObserveFloat64(mSiteLastHeartbeat, time.Since(lastHB).Seconds())
// o.ObserveFloat64(mSiteLastHeartbeat, float64(lastHB.Unix()))
// o.ObserveInt64(mTunnelSessions, int64(len(activeSessions)))
// return nil
// })
@@ -290,8 +336,6 @@ func SetProxyObservableCallback(cb func(context.Context, metric.Observer) error)
func RegisterBuildInfo(version, commit string) {
buildVersion = version
buildCommit = commit
// Increment restart count on boot
mRestartCount.Add(context.Background(), 1)
}
// Config reloads
@@ -358,6 +402,25 @@ func IncWSKeepaliveFailure(ctx context.Context, reason string) {
)...))
}
// SetWSConnectionState updates the backing gauge for the WebSocket connected state.
func SetWSConnectionState(connected bool) {
if connected {
wsConnectedState.Store(1)
} else {
wsConnectedState.Store(0)
}
}
// IncWSReconnect increments the WebSocket reconnect counter with a bounded reason label.
func IncWSReconnect(ctx context.Context, reason string) {
if reason == "" {
reason = "unknown"
}
mWSReconnects.Add(ctx, 1, metric.WithAttributes(attrsWithSite(
attribute.String("reason", reason),
)...))
}
func ObserveWSSessionDuration(ctx context.Context, seconds float64, result string) {
mWSSessionDuration.Record(ctx, seconds, metric.WithAttributes(attrsWithSite(
attribute.String("result", result),
@@ -413,6 +476,21 @@ func ObserveProxyConnectionDuration(ctx context.Context, tunnelID, protocol, res
mProxyConnDuration.Record(ctx, seconds, metric.WithAttributes(attrsWithSite(attrs...)...))
}
// IncProxyConnectionEvent records proxy connection lifecycle events (opened/closed).
func IncProxyConnectionEvent(ctx context.Context, tunnelID, protocol, event string) {
if event == "" {
event = "unknown"
}
attrs := []attribute.KeyValue{
attribute.String("protocol", protocol),
attribute.String("event", event),
}
if ShouldIncludeTunnelID() && tunnelID != "" {
attrs = append(attrs, attribute.String("tunnel_id", tunnelID))
}
mProxyConnectionsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite(attrs...)...))
}
// --- Config/PKI helpers ---
func ObserveConfigApply(ctx context.Context, phase, result string, seconds float64) {

View File

@@ -0,0 +1,59 @@
package telemetry
import (
"sync"
"time"
)
func resetMetricsForTest() {
initOnce = sync.Once{}
obsOnce = sync.Once{}
proxyObsOnce = sync.Once{}
obsStopper = nil
proxyStopper = nil
if wsConnStopper != nil {
wsConnStopper()
}
wsConnStopper = nil
meter = nil
mSiteRegistrations = nil
mSiteOnline = nil
mSiteLastHeartbeat = nil
mTunnelSessions = nil
mTunnelBytes = nil
mTunnelLatency = nil
mReconnects = nil
mConnAttempts = nil
mConnErrors = nil
mConfigReloads = nil
mConfigApply = nil
mCertRotationTotal = nil
mProcessStartTime = nil
mBuildInfo = nil
mWSConnectLatency = nil
mWSMessages = nil
mWSDisconnects = nil
mWSKeepaliveFailure = nil
mWSSessionDuration = nil
mWSConnected = nil
mWSReconnects = nil
mProxyActiveConns = nil
mProxyBufferBytes = nil
mProxyAsyncBacklogByte = nil
mProxyDropsTotal = nil
mProxyAcceptsTotal = nil
mProxyConnDuration = nil
mProxyConnectionsTotal = nil
processStartUnix = float64(time.Now().UnixNano()) / 1e9
wsConnectedState.Store(0)
includeTunnelIDVal.Store(false)
includeSiteLabelVal.Store(false)
}

View File

@@ -62,8 +62,8 @@ func observeSiteOnlineFor(o metric.Observer, sv StateView, siteID string) {
func observeLastHeartbeatFor(o metric.Observer, sv StateView, siteID string) {
if t, ok := sv.LastHeartbeat(siteID); ok {
secs := time.Since(t).Seconds()
o.ObserveFloat64(mSiteLastHeartbeat, secs, metric.WithAttributes(
ts := float64(t.UnixNano()) / 1e9
o.ObserveFloat64(mSiteLastHeartbeat, ts, metric.WithAttributes(
attribute.String("site_id", siteID),
))
}

View File

@@ -118,6 +118,11 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) {
} else {
includeTunnelIDVal.Store(false)
}
if getenv("NEWT_METRICS_INCLUDE_SITE_LABELS", "true") == "true" {
includeSiteLabelVal.Store(true)
} else {
includeSiteLabelVal.Store(false)
}
res := buildResource(ctx, cfg)
UpdateSiteInfo(cfg.SiteID, cfg.Region)
@@ -294,7 +299,10 @@ func parseResourceAttributes(s string) map[string]string {
// Global site/region used to enrich metric labels.
var siteIDVal atomic.Value
var regionVal atomic.Value
var includeTunnelIDVal atomic.Value // bool; default true
var (
includeTunnelIDVal atomic.Value // bool; default true
includeSiteLabelVal atomic.Value // bool; default false
)
// UpdateSiteInfo updates the global site_id and region used for metric labels.
// Thread-safe via atomic.Value: subsequent metric emissions will include
@@ -335,7 +343,12 @@ func siteAttrs() []attribute.KeyValue {
}
// SiteLabelKVs exposes site label KVs for other packages (e.g., proxy manager).
func SiteLabelKVs() []attribute.KeyValue { return siteAttrs() }
func SiteLabelKVs() []attribute.KeyValue {
if !ShouldIncludeSiteLabels() {
return nil
}
return siteAttrs()
}
// ShouldIncludeTunnelID returns whether tunnel_id labels should be emitted.
func ShouldIncludeTunnelID() bool {
@@ -345,6 +358,15 @@ func ShouldIncludeTunnelID() bool {
return true
}
// ShouldIncludeSiteLabels returns whether site_id/region should be emitted as
// metric labels in addition to resource attributes.
func ShouldIncludeSiteLabels() bool {
if v, ok := includeSiteLabelVal.Load().(bool); ok {
return v
}
return false
}
func getenv(k, d string) string {
if v := os.Getenv(k); v != "" {
return v

View File

@@ -15,12 +15,18 @@ import (
// Test that disallowed attributes are filtered from the exposition.
func TestAttributeFilterDropsUnknownKeys(t *testing.T) {
ctx := context.Background()
resetMetricsForTest()
t.Setenv("NEWT_METRICS_INCLUDE_SITE_LABELS", "true")
cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0"}
tel, err := Init(ctx, cfg)
if err != nil { t.Fatalf("init: %v", err) }
if err != nil {
t.Fatalf("init: %v", err)
}
defer func() { _ = tel.Shutdown(context.Background()) }()
if tel.PrometheusHandler == nil { t.Fatalf("prom handler nil") }
if tel.PrometheusHandler == nil {
t.Fatalf("prom handler nil")
}
ts := httptest.NewServer(tel.PrometheusHandler)
defer ts.Close()
@@ -32,7 +38,9 @@ cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0"}
time.Sleep(50 * time.Millisecond)
resp, err := http.Get(ts.URL)
if err != nil { t.Fatalf("GET: %v", err) }
if err != nil {
t.Fatalf("GET: %v", err)
}
defer resp.Body.Close()
b, _ := io.ReadAll(resp.Body)
body := string(b)
@@ -43,4 +51,3 @@ cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0"}
t.Fatalf("expected allowed attribute site_id to be present in metrics, got: %s", body)
}
}

View File

@@ -7,6 +7,7 @@ import (
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
"time"
@@ -15,36 +16,61 @@ import (
// Golden test that /metrics contains expected metric names.
func TestMetricsGoldenContains(t *testing.T) {
ctx := context.Background()
resetMetricsForTest()
t.Setenv("NEWT_METRICS_INCLUDE_SITE_LABELS", "true")
cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0", BuildVersion: "test"}
tel, err := Init(ctx, cfg)
if err != nil { t.Fatalf("telemetry init error: %v", err) }
if err != nil {
t.Fatalf("telemetry init error: %v", err)
}
defer func() { _ = tel.Shutdown(context.Background()) }()
if tel.PrometheusHandler == nil { t.Fatalf("prom handler nil") }
if tel.PrometheusHandler == nil {
t.Fatalf("prom handler nil")
}
ts := httptest.NewServer(tel.PrometheusHandler)
defer ts.Close()
// Trigger a counter
// Trigger counters to ensure they appear in the scrape
IncConnAttempt(ctx, "websocket", "success")
IncWSReconnect(ctx, "io_error")
IncProxyConnectionEvent(ctx, "", "tcp", ProxyConnectionOpened)
if tel.MeterProvider != nil {
_ = tel.MeterProvider.ForceFlush(ctx)
}
time.Sleep(100 * time.Millisecond)
var body string
for i := 0; i < 5; i++ {
resp, err := http.Get(ts.URL)
if err != nil { t.Fatalf("GET metrics failed: %v", err) }
defer resp.Body.Close()
if err != nil {
t.Fatalf("GET metrics failed: %v", err)
}
b, _ := io.ReadAll(resp.Body)
body := string(b)
_ = resp.Body.Close()
body = string(b)
if strings.Contains(body, "newt_connection_attempts_total") {
break
}
time.Sleep(100 * time.Millisecond)
}
f, err := os.Open("internal/telemetry/testdata/expected_contains.golden")
if err != nil { t.Fatalf("read golden: %v", err) }
f, err := os.Open(filepath.Join("testdata", "expected_contains.golden"))
if err != nil {
t.Fatalf("read golden: %v", err)
}
defer f.Close()
s := bufio.NewScanner(f)
for s.Scan() {
needle := strings.TrimSpace(s.Text())
if needle == "" { continue }
if needle == "" {
continue
}
if !strings.Contains(body, needle) {
t.Fatalf("expected metrics body to contain %q. body=\n%s", needle, body)
}
}
if err := s.Err(); err != nil { t.Fatalf("scan golden: %v", err) }
if err := s.Err(); err != nil {
t.Fatalf("scan golden: %v", err)
}
}

View File

@@ -13,6 +13,8 @@ import (
// Smoke test that /metrics contains at least one newt_* metric when Prom exporter is enabled.
func TestMetricsSmoke(t *testing.T) {
ctx := context.Background()
resetMetricsForTest()
t.Setenv("NEWT_METRICS_INCLUDE_SITE_LABELS", "true")
cfg := Config{
ServiceName: "newt",
PromEnabled: true,
@@ -37,18 +39,27 @@ func TestMetricsSmoke(t *testing.T) {
// Record a simple metric and then fetch /metrics
IncConnAttempt(ctx, "websocket", "success")
if tel.MeterProvider != nil {
_ = tel.MeterProvider.ForceFlush(ctx)
}
// Give the exporter a tick to collect
time.Sleep(100 * time.Millisecond)
var body string
for i := 0; i < 5; i++ {
resp, err := http.Get(ts.URL)
if err != nil {
t.Fatalf("GET /metrics failed: %v", err)
}
defer resp.Body.Close()
b, _ := io.ReadAll(resp.Body)
body := string(b)
_ = resp.Body.Close()
body = string(b)
if strings.Contains(body, "newt_connection_attempts_total") {
break
}
time.Sleep(100 * time.Millisecond)
}
if !strings.Contains(body, "newt_connection_attempts_total") {
t.Fatalf("expected newt_connection_attempts_total in metrics, got:\n%s", body)
}
}

View File

@@ -1,3 +1,7 @@
newt_connection_attempts_total
newt_websocket_connected
newt_websocket_reconnects_total
newt_proxy_connections_total
newt_build_info
process_start_time_seconds

View File

@@ -509,6 +509,7 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
tunnelID := pm.currentTunnelID
telemetry.IncProxyAccept(context.Background(), tunnelID, "tcp", "success", "")
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "tcp", telemetry.ProxyConnectionOpened)
if tunnelID != "" {
state.Global().IncSessions(tunnelID)
if e := pm.getEntry(tunnelID); e != nil {
@@ -523,6 +524,7 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
logger.Error("Error connecting to target: %v", err)
accepted.Close()
telemetry.IncProxyAccept(context.Background(), tunnelID, "tcp", "failure", classifyProxyError(err))
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "tcp", telemetry.ProxyConnectionClosed)
telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "tcp", "failure", time.Since(connStart).Seconds())
return
}
@@ -556,6 +558,7 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
}
}
telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "tcp", "success", time.Since(connStart).Seconds())
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "tcp", telemetry.ProxyConnectionClosed)
}(tunnelID, conn)
}
}
@@ -631,6 +634,7 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
}
tunnelID := pm.currentTunnelID
telemetry.IncProxyAccept(context.Background(), tunnelID, "udp", "success", "")
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionOpened)
// Only increment activeUDP after a successful DialUDP
if e := pm.getEntry(tunnelID); e != nil {
e.activeUDP.Add(1)
@@ -655,6 +659,7 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
}
clientsMutex.Unlock()
telemetry.ObserveProxyConnectionDuration(context.Background(), tunnelID, "udp", result, time.Since(start).Seconds())
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionClosed)
}()
buffer := make([]byte, 65507)

View File

@@ -20,7 +20,7 @@ probe "newt_* presence" "^newt_" || true
# Site gauges with site_id
probe "site_online with site_id" "^newt_site_online\{.*site_id=\"[^\"]+\"" || true
probe "last_heartbeat with site_id" "^newt_site_last_heartbeat_seconds\{.*site_id=\"[^\"]+\"" || true
probe "last_heartbeat with site_id" "^newt_site_last_heartbeat_timestamp_seconds\{.*site_id=\"[^\"]+\"" || true
# Bytes with direction ingress/egress and protocol
probe "tunnel bytes ingress" "^newt_tunnel_bytes_total\{.*direction=\"ingress\".*protocol=\"(tcp|udp)\"" || true
@@ -39,11 +39,14 @@ fi
# WebSocket metrics (when OTLP/WS used)
probe "websocket connect latency buckets" "^newt_websocket_connect_latency_seconds_bucket" || true
probe "websocket messages total" "^newt_websocket_messages_total\{.*(direction|msg_type)=" || true
probe "websocket connected gauge" "^newt_websocket_connected" || true
probe "websocket reconnects total" "^newt_websocket_reconnects_total\{" || true
# Proxy metrics (when proxy active)
probe "proxy active connections" "^newt_proxy_active_connections\{" || true
probe "proxy buffer bytes" "^newt_proxy_buffer_bytes\{" || true
probe "proxy drops total" "^newt_proxy_drops_total\{" || true
probe "proxy connections total" "^newt_proxy_connections_total\{" || true
# Config apply
probe "config apply seconds buckets" "^newt_config_apply_seconds_bucket\{" || true

View File

@@ -167,6 +167,7 @@ func (c *Client) Close() error {
// Set connection status to false
c.setConnected(false)
telemetry.SetWSConnectionState(false)
// Close the WebSocket connection gracefully
if c.conn != nil {
@@ -508,6 +509,7 @@ func (c *Client) establishConnection() error {
} else {
telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonError)
}
telemetry.IncWSReconnect(ctx, etype)
return fmt.Errorf("failed to connect to WebSocket: %w", err)
}
@@ -515,6 +517,7 @@ func (c *Client) establishConnection() error {
telemetry.ObserveWSConnectLatency(ctx, lat, "success", "")
c.conn = conn
c.setConnected(true)
telemetry.SetWSConnectionState(true)
c.setMetricsContext(ctx)
sessionStart := time.Now()
// Wire up pong handler for metrics
@@ -632,6 +635,7 @@ func (c *Client) pingMonitor() {
default:
logger.Error("Ping failed: %v", err)
telemetry.IncWSKeepaliveFailure(c.metricsContext(), "ping_write")
telemetry.IncWSReconnect(c.metricsContext(), "ping_write")
c.reconnect()
return
}
@@ -660,6 +664,7 @@ func (c *Client) readPumpWithDisconnectDetection(started time.Time) {
// Shutting down, don't reconnect
return
default:
telemetry.IncWSReconnect(ctx, disconnectReason)
c.reconnect()
}
}()
@@ -710,6 +715,7 @@ func (c *Client) readPumpWithDisconnectDetection(started time.Time) {
func (c *Client) reconnect() {
c.setConnected(false)
telemetry.SetWSConnectionState(false)
if c.conn != nil {
c.conn.Close()
c.conn = nil