diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index 130fbd3..bd163ca 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -13,8 +13,8 @@ import ( // low-cardinality label guidance from the issue description. // // Counters end with _total, durations are in seconds, sizes in bytes. -// Only low-cardinality stable labels are supported: site_id, tunnel_id, -// transport, direction, result, reason, error_type, region. +// Only low-cardinality stable labels are supported: tunnel_id, +// transport, direction, result, reason, error_type. var ( initOnce sync.Once @@ -147,9 +147,9 @@ var ( // Example inside your code (where you have access to current state): // // telemetry.SetObservableCallback(func(ctx context.Context, o metric.Observer) error { -// o.ObserveInt64(mSiteOnline, 1, attribute.String("site_id", siteID)) -// o.ObserveFloat64(mSiteLastHeartbeat, time.Since(lastHB).Seconds(), attribute.String("site_id", siteID)) -// o.ObserveInt64(mTunnelSessions, int64(len(activeSessions)), attribute.String("site_id", siteID)) +// o.ObserveInt64(mSiteOnline, 1) +// o.ObserveFloat64(mSiteLastHeartbeat, time.Since(lastHB).Seconds()) +// o.ObserveInt64(mTunnelSessions, int64(len(activeSessions))) // return nil // }) func SetObservableCallback(cb func(context.Context, metric.Observer) error) { @@ -174,20 +174,15 @@ func IncConfigReload(ctx context.Context, result string) { // Helpers for counters/histograms -func IncSiteRegistration(ctx context.Context, siteID, region, result string) { +func IncSiteRegistration(ctx context.Context, result string) { attrs := []attribute.KeyValue{ - attribute.String("site_id", siteID), attribute.String("result", result), } - if region != "" { - attrs = append(attrs, attribute.String("region", region)) - } mSiteRegistrations.Add(ctx, 1, metric.WithAttributes(attrs...)) } -func AddTunnelBytes(ctx context.Context, siteID, tunnelID, direction string, n int64) { +func AddTunnelBytes(ctx context.Context, tunnelID, direction string, n int64) { mTunnelBytes.Add(ctx, n, metric.WithAttributes( - attribute.String("site_id", siteID), attribute.String("tunnel_id", tunnelID), attribute.String("direction", direction), )) @@ -198,33 +193,29 @@ func AddTunnelBytesSet(ctx context.Context, n int64, attrs attribute.Set) { mTunnelBytes.Add(ctx, n, metric.WithAttributeSet(attrs)) } -func ObserveTunnelLatency(ctx context.Context, siteID, tunnelID, transport string, seconds float64) { +func ObserveTunnelLatency(ctx context.Context, tunnelID, transport string, seconds float64) { mTunnelLatency.Record(ctx, seconds, metric.WithAttributes( - attribute.String("site_id", siteID), attribute.String("tunnel_id", tunnelID), attribute.String("transport", transport), )) } -func IncReconnect(ctx context.Context, siteID, tunnelID, reason string) { +func IncReconnect(ctx context.Context, tunnelID, reason string) { mReconnects.Add(ctx, 1, metric.WithAttributes( - attribute.String("site_id", siteID), attribute.String("tunnel_id", tunnelID), attribute.String("reason", reason), )) } -func IncConnAttempt(ctx context.Context, siteID, transport, result string) { +func IncConnAttempt(ctx context.Context, transport, result string) { mConnAttempts.Add(ctx, 1, metric.WithAttributes( - attribute.String("site_id", siteID), attribute.String("transport", transport), attribute.String("result", result), )) } -func IncConnError(ctx context.Context, siteID, transport, typ string) { +func IncConnError(ctx context.Context, transport, typ string) { mConnErrors.Add(ctx, 1, metric.WithAttributes( - attribute.String("site_id", siteID), attribute.String("transport", transport), attribute.String("error_type", typ), )) diff --git a/internal/telemetry/state_view.go b/internal/telemetry/state_view.go index 4c97ddf..ec3f529 100644 --- a/internal/telemetry/state_view.go +++ b/internal/telemetry/state_view.go @@ -42,16 +42,19 @@ func RegisterStateView(v StateView) { if online { val = 1 } - o.ObserveInt64(mSiteOnline, val, metric.WithAttributes(attribute.String("site_id", siteID))) + o.ObserveInt64(mSiteOnline, val) } if t, ok := sv.LastHeartbeat(siteID); ok { secs := time.Since(t).Seconds() - o.ObserveFloat64(mSiteLastHeartbeat, secs, metric.WithAttributes(attribute.String("site_id", siteID))) + o.ObserveFloat64(mSiteLastHeartbeat, secs) } // If the view supports per-tunnel sessions, report them labeled by tunnel_id. if tm, ok := any.(interface{ SessionsByTunnel() map[string]int64 }); ok { for tid, n := range tm.SessionsByTunnel() { - o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes(attribute.String("tunnel_id", tid))) + o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes( + attribute.String("tunnel_id", tid), + attribute.String("transport", "tcp"), + )) } } } diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 20a25c0..d54e4d8 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -93,16 +93,21 @@ type Setup struct { // installs recommended histogram views for *_latency_seconds, and returns a Setup with // a Shutdown method to flush exporters. func Init(ctx context.Context, cfg Config) (*Setup, error) { + // Build resource with required attributes and only include optional ones when non-empty + attrs := []attribute.KeyValue{ + semconv.ServiceName(cfg.ServiceName), + semconv.ServiceVersion(cfg.ServiceVersion), + } + if cfg.SiteID != "" { + attrs = append(attrs, attribute.String("site_id", cfg.SiteID)) + } + if cfg.Region != "" { + attrs = append(attrs, attribute.String("region", cfg.Region)) + } res, _ := resource.New(ctx, resource.WithFromEnv(), resource.WithHost(), - resource.WithAttributes( - semconv.ServiceName(cfg.ServiceName), - semconv.ServiceVersion(cfg.ServiceVersion), - // Optional resource attributes - attribute.String("site_id", cfg.SiteID), - attribute.String("region", cfg.Region), - ), + resource.WithAttributes(attrs...), ) s := &Setup{} @@ -168,7 +173,7 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) { AttributeFilter: func(kv attribute.KeyValue) bool { k := string(kv.Key) switch k { - case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "error_type": + case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "error_type", "version", "commit": return true default: return false diff --git a/internal/telemetry/telemetry_golden_test.go b/internal/telemetry/telemetry_golden_test.go index 91dcbd2..3e6f896 100644 --- a/internal/telemetry/telemetry_golden_test.go +++ b/internal/telemetry/telemetry_golden_test.go @@ -25,7 +25,7 @@ cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0", defer ts.Close() // Trigger a counter - IncConnAttempt(ctx, "ignored", "websocket", "success") + IncConnAttempt(ctx, "websocket", "success") time.Sleep(100 * time.Millisecond) resp, err := http.Get(ts.URL) diff --git a/internal/telemetry/telemetry_smoke_test.go b/internal/telemetry/telemetry_smoke_test.go index b820af1..d51ea8e 100644 --- a/internal/telemetry/telemetry_smoke_test.go +++ b/internal/telemetry/telemetry_smoke_test.go @@ -36,7 +36,7 @@ func TestMetricsSmoke(t *testing.T) { defer ts.Close() // Record a simple metric and then fetch /metrics - IncConnAttempt(ctx, "site-1", "websocket", "success") + IncConnAttempt(ctx, "websocket", "success") // Give the exporter a tick to collect time.Sleep(100 * time.Millisecond) diff --git a/main.go b/main.go index 025967a..d3624c8 100644 --- a/main.go +++ b/main.go @@ -666,7 +666,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub logger.Debug("Testing initial connection with reliable ping...") lat, err := reliablePing(tnet, wgData.ServerIP, pingTimeout, 5) if err == nil && wgData.PublicKey != "" { - telemetry.ObserveTunnelLatency(context.Background(), "", wgData.PublicKey, "wireguard", lat.Seconds()) + telemetry.ObserveTunnelLatency(context.Background(), wgData.PublicKey, "wireguard", lat.Seconds()) } if err != nil { logger.Warn("Initial reliable ping failed, but continuing: %v", err) @@ -692,7 +692,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub connected = true // telemetry: record a successful site registration (omit region unless available) - telemetry.IncSiteRegistration(context.Background(), id, "", "success") + telemetry.IncSiteRegistration(context.Background(), "success") // add the targets if there are any if len(wgData.Targets.TCP) > 0 { @@ -728,7 +728,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub client.RegisterHandler("newt/wg/reconnect", func(msg websocket.WSMessage) { logger.Info("Received reconnect message") if wgData.PublicKey != "" { - telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request") + telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server_request") } // Close the WireGuard device and TUN @@ -763,7 +763,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub client.RegisterHandler("newt/wg/terminate", func(msg websocket.WSMessage) { logger.Info("Received termination message") if wgData.PublicKey != "" { - telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request") + telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server_request") } // Close the WireGuard device and TUN diff --git a/util.go b/util.go index dc19388..25cdb9d 100644 --- a/util.go +++ b/util.go @@ -295,7 +295,7 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien connectionLost = true logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures) if tunnelID != "" { - telemetry.IncReconnect(context.Background(), "", tunnelID, telemetry.ReasonTimeout) + telemetry.IncReconnect(context.Background(), tunnelID, telemetry.ReasonTimeout) } stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second) // Send registration message to the server for backward compatibility @@ -325,7 +325,7 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien recentLatencies = append(recentLatencies, latency) // Record tunnel latency (limit sampling to this periodic check) if tunnelID != "" { - telemetry.ObserveTunnelLatency(context.Background(), "", tunnelID, "wireguard", latency.Seconds()) + telemetry.ObserveTunnelLatency(context.Background(), tunnelID, "wireguard", latency.Seconds()) } if len(recentLatencies) > 10 { recentLatencies = recentLatencies[1:] diff --git a/websocket/client.go b/websocket/client.go index c9ac264..e38a6c9 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -291,7 +291,7 @@ func (c *Client) getToken() (string, error) { } resp, err := client.Do(req) if err != nil { - telemetry.IncConnError(context.Background(), c.config.ID, "auth", classifyConnError(err)) + telemetry.IncConnError(context.Background(), "auth", classifyConnError(err)) return "", fmt.Errorf("failed to request new token: %w", err) } defer resp.Body.Close() @@ -299,17 +299,17 @@ func (c *Client) getToken() (string, error) { if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) - telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "failure") + telemetry.IncConnAttempt(context.Background(), "auth", "failure") bin := "http_other" if resp.StatusCode >= 500 { bin = "http_5xx" } else if resp.StatusCode >= 400 { bin = "http_4xx" } - telemetry.IncConnError(context.Background(), c.config.ID, "auth", bin) + telemetry.IncConnError(context.Background(), "auth", bin) // Reconnect reason mapping for auth failures if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { - telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonAuthError) + telemetry.IncReconnect(context.Background(), c.config.ID, telemetry.ReasonAuthError) } return "", fmt.Errorf("failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) } @@ -329,7 +329,7 @@ func (c *Client) getToken() (string, error) { } logger.Debug("Received token: %s", tokenResp.Data.Token) - telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "success") + telemetry.IncConnAttempt(context.Background(), "auth", "success") return tokenResp.Data.Token, nil } @@ -379,8 +379,8 @@ func (c *Client) establishConnection() error { if err != nil { // telemetry: connection attempt failed before dialing // site_id isn't globally available here; use client ID as site_id (low cardinality) - telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure") - telemetry.IncConnError(context.Background(), c.config.ID, "websocket", classifyConnError(err)) + telemetry.IncConnAttempt(context.Background(), "websocket", "failure") + telemetry.IncConnError(context.Background(), "websocket", classifyConnError(err)) return fmt.Errorf("failed to get token: %w", err) } @@ -441,21 +441,21 @@ func (c *Client) establishConnection() error { conn, _, err := dialer.DialContext(spanCtx, u.String(), nil) if err != nil { - telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure") + telemetry.IncConnAttempt(context.Background(), "websocket", "failure") etype := classifyConnError(err) - telemetry.IncConnError(context.Background(), c.config.ID, "websocket", etype) + telemetry.IncConnError(context.Background(), "websocket", etype) // Map handshake-related errors to reconnect reasons where appropriate if etype == "tls" { - telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonHandshakeError) + telemetry.IncReconnect(context.Background(), c.config.ID, telemetry.ReasonHandshakeError) } else if etype == "timeout" { - telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonTimeout) + telemetry.IncReconnect(context.Background(), c.config.ID, telemetry.ReasonTimeout) } else { - telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonError) + telemetry.IncReconnect(context.Background(), c.config.ID, telemetry.ReasonError) } return fmt.Errorf("failed to connect to WebSocket: %w", err) } - telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "success") + telemetry.IncConnAttempt(context.Background(), "websocket", "success") c.conn = conn c.setConnected(true) diff --git a/wg/wg.go b/wg/wg.go index adf8df6..1607427 100644 --- a/wg/wg.go +++ b/wg/wg.go @@ -306,7 +306,7 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { telemetry.IncConfigReload(context.Background(), "success") // Optional reconnect reason mapping: config change if s.serverPubKey != "" { - telemetry.IncReconnect(context.Background(), "", s.serverPubKey, telemetry.ReasonConfigChange) + telemetry.IncReconnect(context.Background(), s.serverPubKey, telemetry.ReasonConfigChange) } // Ensure the WireGuard interface and peers are configured diff --git a/wgnetstack/wgnetstack.go b/wgnetstack/wgnetstack.go index 09f160e..dd7d493 100644 --- a/wgnetstack/wgnetstack.go +++ b/wgnetstack/wgnetstack.go @@ -246,7 +246,7 @@ func NewWireGuardService(interfaceName string, mtu int, generateAndSaveKeyTo str // ReportRTT allows reporting native RTTs to telemetry, rate-limited externally. func (s *WireGuardService) ReportRTT(seconds float64) { if s.serverPubKey == "" { return } - telemetry.ObserveTunnelLatency(context.Background(), "", s.serverPubKey, "wireguard", seconds) + telemetry.ObserveTunnelLatency(context.Background(), s.serverPubKey, "wireguard", seconds) } func (s *WireGuardService) addTcpTarget(msg websocket.WSMessage) {