From 9ace45e71f388477d493eb87d9b9758d4a43290e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc=20Sch=C3=A4fer?= Date: Wed, 8 Oct 2025 00:43:53 +0200 Subject: [PATCH] fix(metrics): direction=ingress|egress for bytes; remove transport on tunnel_sessions; extend allow-list (msg_type, phase); add units for histograms and bytes; handle callback errors; normalize error_type taxonomy; HTTP error mapping to enums --- internal/telemetry/metrics.go | 33 ++++++++++++++++++++++---------- internal/telemetry/state_view.go | 1 - internal/telemetry/telemetry.go | 4 +++- proxy/manager.go | 8 ++++---- websocket/client.go | 28 ++++++++++++--------------- 5 files changed, 42 insertions(+), 32 deletions(-) diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index 2b332e4..ac17fb9 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -95,12 +95,14 @@ func registerInstruments() error { return } mTunnelBytes, err = meter.Int64Counter("newt_tunnel_bytes_total", - metric.WithDescription("Tunnel bytes in/out")) + metric.WithDescription("Tunnel bytes ingress/egress"), + metric.WithUnit("By")) if err != nil { return } mTunnelLatency, err = meter.Float64Histogram("newt_tunnel_latency_seconds", - metric.WithDescription("Per-tunnel latency in seconds")) + metric.WithDescription("Per-tunnel latency in seconds"), + metric.WithUnit("s")) if err != nil { return } @@ -128,7 +130,8 @@ func registerInstruments() error { mRestartCount, _ = meter.Int64Counter("newt_restart_count_total", metric.WithDescription("Process restart count (incremented on start)")) mConfigApply, _ = meter.Float64Histogram("newt_config_apply_seconds", - metric.WithDescription("Configuration apply duration in seconds")) + metric.WithDescription("Configuration apply duration in seconds"), + metric.WithUnit("s")) mCertRotationTotal, _ = meter.Int64Counter("newt_cert_rotation_total", metric.WithDescription("Certificate rotation events (success/failure)")) @@ -138,7 +141,8 @@ func registerInstruments() error { // WebSocket mWSConnectLatency, _ = meter.Float64Histogram("newt_websocket_connect_latency_seconds", - metric.WithDescription("WebSocket connect latency in seconds")) + metric.WithDescription("WebSocket connect latency in seconds"), + metric.WithUnit("s")) mWSMessages, _ = meter.Int64Counter("newt_websocket_messages_total", metric.WithDescription("WebSocket messages by direction and type")) @@ -146,14 +150,16 @@ func registerInstruments() error { mProxyActiveConns, _ = meter.Int64ObservableGauge("newt_proxy_active_connections", metric.WithDescription("Proxy active connections per tunnel and protocol")) mProxyBufferBytes, _ = meter.Int64ObservableGauge("newt_proxy_buffer_bytes", - metric.WithDescription("Proxy buffer bytes (may approximate async backlog)")) + metric.WithDescription("Proxy buffer bytes (may approximate async backlog)"), + metric.WithUnit("By")) mProxyAsyncBacklogByte, _ = meter.Int64ObservableGauge("newt_proxy_async_backlog_bytes", - metric.WithDescription("Unflushed async byte backlog per tunnel and protocol")) + metric.WithDescription("Unflushed async byte backlog per tunnel and protocol"), + metric.WithUnit("By")) mProxyDropsTotal, _ = meter.Int64Counter("newt_proxy_drops_total", metric.WithDescription("Proxy drops due to write errors")) // Register a default callback for build info if version/commit set - meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { + if e := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { if buildVersion == "" && buildCommit == "" { return nil } @@ -167,7 +173,10 @@ func registerInstruments() error { attrs = append(attrs, siteAttrs()...) o.ObserveInt64(mBuildInfo, 1, metric.WithAttributes(attrs...)) return nil - }, mBuildInfo) + }, mBuildInfo); e != nil { + // forward to global OTel error handler; Init will continue but build_info will be missing + otel.Handle(e) + } }) return err } @@ -197,7 +206,9 @@ var ( // }) func SetObservableCallback(cb func(context.Context, metric.Observer) error) { obsOnce.Do(func() { - meter.RegisterCallback(cb, mSiteOnline, mSiteLastHeartbeat, mTunnelSessions) + if e := meter.RegisterCallback(cb, mSiteOnline, mSiteLastHeartbeat, mTunnelSessions); e != nil { + otel.Handle(e) + } obsStopper = func() { /* no-op; otel callbacks are unregistered when provider shuts down */ } }) } @@ -205,7 +216,9 @@ func SetObservableCallback(cb func(context.Context, metric.Observer) error) { // SetProxyObservableCallback registers a callback to observe proxy gauges. func SetProxyObservableCallback(cb func(context.Context, metric.Observer) error) { proxyObsOnce.Do(func() { - meter.RegisterCallback(cb, mProxyActiveConns, mProxyBufferBytes, mProxyAsyncBacklogByte) + if e := meter.RegisterCallback(cb, mProxyActiveConns, mProxyBufferBytes, mProxyAsyncBacklogByte); e != nil { + otel.Handle(e) + } proxyStopper = func() {} }) } diff --git a/internal/telemetry/state_view.go b/internal/telemetry/state_view.go index fe57dc3..8bb22e4 100644 --- a/internal/telemetry/state_view.go +++ b/internal/telemetry/state_view.go @@ -58,7 +58,6 @@ func RegisterStateView(v StateView) { o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes( attribute.String("site_id", getSiteID()), attribute.String("tunnel_id", tid), - attribute.String("transport", "wireguard"), )) } } diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 9b2a84c..7e3b819 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -195,7 +195,7 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) { AttributeFilter: func(kv attribute.KeyValue) bool { k := string(kv.Key) switch k { - case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "initiator", "error_type", "version", "commit", "site_id", "region": + case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "initiator", "error_type", "msg_type", "phase", "version", "commit", "site_id", "region": return true default: return false @@ -296,6 +296,8 @@ var siteIDVal atomic.Value var regionVal atomic.Value // UpdateSiteInfo updates the global site_id and region used for metric labels. +// Thread-safe via atomic.Value: subsequent metric emissions will include +// the new labels, prior emissions remain unchanged. func UpdateSiteInfo(siteID, region string) { if siteID != "" { siteIDVal.Store(siteID) diff --git a/proxy/manager.go b/proxy/manager.go index eda7389..cf15c66 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -119,22 +119,22 @@ func (pm *ProxyManager) SetTunnelID(id string) { site := telemetry.SiteLabelKVs() e.attrInTCP = attribute.NewSet(append(site, attribute.String("tunnel_id", id), - attribute.String("direction", "in"), + attribute.String("direction", "ingress"), attribute.String("protocol", "tcp"), )...) e.attrOutTCP = attribute.NewSet(append(site, attribute.String("tunnel_id", id), - attribute.String("direction", "out"), + attribute.String("direction", "egress"), attribute.String("protocol", "tcp"), )...) e.attrInUDP = attribute.NewSet(append(site, attribute.String("tunnel_id", id), - attribute.String("direction", "in"), + attribute.String("direction", "ingress"), attribute.String("protocol", "udp"), )...) e.attrOutUDP = attribute.NewSet(append(site, attribute.String("tunnel_id", id), - attribute.String("direction", "out"), + attribute.String("direction", "egress"), attribute.String("protocol", "udp"), )...) } diff --git a/websocket/client.go b/websocket/client.go index 5a7e91f..db9d810 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -304,13 +304,11 @@ func (c *Client) getToken() (string, error) { body, _ := io.ReadAll(resp.Body) logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) telemetry.IncConnAttempt(context.Background(), "auth", "failure") - bin := "http_other" - if resp.StatusCode >= 500 { - bin = "http_5xx" - } else if resp.StatusCode >= 400 { - bin = "http_4xx" + etype := "io_error" + if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { + etype = "auth_failed" } - telemetry.IncConnError(context.Background(), "auth", bin) + telemetry.IncConnError(context.Background(), "auth", etype) // Reconnect reason mapping for auth failures if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonAuthError) @@ -338,7 +336,8 @@ func (c *Client) getToken() (string, error) { return tokenResp.Data.Token, nil } -// classifyConnError maps common errors to low-cardinality error_type labels +// classifyConnError maps to fixed, low-cardinality error_type values. +// Allowed enum: dial_timeout, tls_handshake, auth_failed, io_error func classifyConnError(err error) string { if err == nil { return "" @@ -346,17 +345,14 @@ func classifyConnError(err error) string { msg := strings.ToLower(err.Error()) switch { case strings.Contains(msg, "tls") || strings.Contains(msg, "certificate"): - return "tls" - case strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout"): - return "timeout" - case strings.Contains(msg, "no such host") || strings.Contains(msg, "dns"): - return "dns" + return "tls_handshake" + case strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout") || strings.Contains(msg, "deadline exceeded"): + return "dial_timeout" case strings.Contains(msg, "unauthorized") || strings.Contains(msg, "forbidden"): - return "auth" - case strings.Contains(msg, "broken pipe") || strings.Contains(msg, "connection reset") || strings.Contains(msg, "connection refused") || strings.Contains(msg, "use of closed network connection") || strings.Contains(msg, "network is unreachable"): - return "io" + return "auth_failed" default: - return "other" + // Group remaining network/socket errors as io_error to avoid label explosion + return "io_error" } }