refactor: Simplify telemetry metrics by removing site_id and enhancing tunnel_id usage

This commit is contained in:
Marc Schäfer
2025-10-07 18:43:09 +02:00
parent f8fd8e1bc5
commit a86b14d97d
10 changed files with 53 additions and 54 deletions

View File

@@ -13,8 +13,8 @@ import (
// low-cardinality label guidance from the issue description.
//
// Counters end with _total, durations are in seconds, sizes in bytes.
// Only low-cardinality stable labels are supported: site_id, tunnel_id,
// transport, direction, result, reason, error_type, region.
// Only low-cardinality stable labels are supported: tunnel_id,
// transport, direction, result, reason, error_type.
var (
initOnce sync.Once
@@ -147,9 +147,9 @@ var (
// Example inside your code (where you have access to current state):
//
// telemetry.SetObservableCallback(func(ctx context.Context, o metric.Observer) error {
// o.ObserveInt64(mSiteOnline, 1, attribute.String("site_id", siteID))
// o.ObserveFloat64(mSiteLastHeartbeat, time.Since(lastHB).Seconds(), attribute.String("site_id", siteID))
// o.ObserveInt64(mTunnelSessions, int64(len(activeSessions)), attribute.String("site_id", siteID))
// o.ObserveInt64(mSiteOnline, 1)
// o.ObserveFloat64(mSiteLastHeartbeat, time.Since(lastHB).Seconds())
// o.ObserveInt64(mTunnelSessions, int64(len(activeSessions)))
// return nil
// })
func SetObservableCallback(cb func(context.Context, metric.Observer) error) {
@@ -174,20 +174,15 @@ func IncConfigReload(ctx context.Context, result string) {
// Helpers for counters/histograms
func IncSiteRegistration(ctx context.Context, siteID, region, result string) {
func IncSiteRegistration(ctx context.Context, result string) {
attrs := []attribute.KeyValue{
attribute.String("site_id", siteID),
attribute.String("result", result),
}
if region != "" {
attrs = append(attrs, attribute.String("region", region))
}
mSiteRegistrations.Add(ctx, 1, metric.WithAttributes(attrs...))
}
func AddTunnelBytes(ctx context.Context, siteID, tunnelID, direction string, n int64) {
func AddTunnelBytes(ctx context.Context, tunnelID, direction string, n int64) {
mTunnelBytes.Add(ctx, n, metric.WithAttributes(
attribute.String("site_id", siteID),
attribute.String("tunnel_id", tunnelID),
attribute.String("direction", direction),
))
@@ -198,33 +193,29 @@ func AddTunnelBytesSet(ctx context.Context, n int64, attrs attribute.Set) {
mTunnelBytes.Add(ctx, n, metric.WithAttributeSet(attrs))
}
func ObserveTunnelLatency(ctx context.Context, siteID, tunnelID, transport string, seconds float64) {
func ObserveTunnelLatency(ctx context.Context, tunnelID, transport string, seconds float64) {
mTunnelLatency.Record(ctx, seconds, metric.WithAttributes(
attribute.String("site_id", siteID),
attribute.String("tunnel_id", tunnelID),
attribute.String("transport", transport),
))
}
func IncReconnect(ctx context.Context, siteID, tunnelID, reason string) {
func IncReconnect(ctx context.Context, tunnelID, reason string) {
mReconnects.Add(ctx, 1, metric.WithAttributes(
attribute.String("site_id", siteID),
attribute.String("tunnel_id", tunnelID),
attribute.String("reason", reason),
))
}
func IncConnAttempt(ctx context.Context, siteID, transport, result string) {
func IncConnAttempt(ctx context.Context, transport, result string) {
mConnAttempts.Add(ctx, 1, metric.WithAttributes(
attribute.String("site_id", siteID),
attribute.String("transport", transport),
attribute.String("result", result),
))
}
func IncConnError(ctx context.Context, siteID, transport, typ string) {
func IncConnError(ctx context.Context, transport, typ string) {
mConnErrors.Add(ctx, 1, metric.WithAttributes(
attribute.String("site_id", siteID),
attribute.String("transport", transport),
attribute.String("error_type", typ),
))

View File

@@ -42,16 +42,19 @@ func RegisterStateView(v StateView) {
if online {
val = 1
}
o.ObserveInt64(mSiteOnline, val, metric.WithAttributes(attribute.String("site_id", siteID)))
o.ObserveInt64(mSiteOnline, val)
}
if t, ok := sv.LastHeartbeat(siteID); ok {
secs := time.Since(t).Seconds()
o.ObserveFloat64(mSiteLastHeartbeat, secs, metric.WithAttributes(attribute.String("site_id", siteID)))
o.ObserveFloat64(mSiteLastHeartbeat, secs)
}
// If the view supports per-tunnel sessions, report them labeled by tunnel_id.
if tm, ok := any.(interface{ SessionsByTunnel() map[string]int64 }); ok {
for tid, n := range tm.SessionsByTunnel() {
o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes(attribute.String("tunnel_id", tid)))
o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes(
attribute.String("tunnel_id", tid),
attribute.String("transport", "tcp"),
))
}
}
}

View File

@@ -93,16 +93,21 @@ type Setup struct {
// installs recommended histogram views for *_latency_seconds, and returns a Setup with
// a Shutdown method to flush exporters.
func Init(ctx context.Context, cfg Config) (*Setup, error) {
// Build resource with required attributes and only include optional ones when non-empty
attrs := []attribute.KeyValue{
semconv.ServiceName(cfg.ServiceName),
semconv.ServiceVersion(cfg.ServiceVersion),
}
if cfg.SiteID != "" {
attrs = append(attrs, attribute.String("site_id", cfg.SiteID))
}
if cfg.Region != "" {
attrs = append(attrs, attribute.String("region", cfg.Region))
}
res, _ := resource.New(ctx,
resource.WithFromEnv(),
resource.WithHost(),
resource.WithAttributes(
semconv.ServiceName(cfg.ServiceName),
semconv.ServiceVersion(cfg.ServiceVersion),
// Optional resource attributes
attribute.String("site_id", cfg.SiteID),
attribute.String("region", cfg.Region),
),
resource.WithAttributes(attrs...),
)
s := &Setup{}
@@ -168,7 +173,7 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) {
AttributeFilter: func(kv attribute.KeyValue) bool {
k := string(kv.Key)
switch k {
case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "error_type":
case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "error_type", "version", "commit":
return true
default:
return false

View File

@@ -25,7 +25,7 @@ cfg := Config{ServiceName: "newt", PromEnabled: true, AdminAddr: "127.0.0.1:0",
defer ts.Close()
// Trigger a counter
IncConnAttempt(ctx, "ignored", "websocket", "success")
IncConnAttempt(ctx, "websocket", "success")
time.Sleep(100 * time.Millisecond)
resp, err := http.Get(ts.URL)

View File

@@ -36,7 +36,7 @@ func TestMetricsSmoke(t *testing.T) {
defer ts.Close()
// Record a simple metric and then fetch /metrics
IncConnAttempt(ctx, "site-1", "websocket", "success")
IncConnAttempt(ctx, "websocket", "success")
// Give the exporter a tick to collect
time.Sleep(100 * time.Millisecond)

View File

@@ -666,7 +666,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
logger.Debug("Testing initial connection with reliable ping...")
lat, err := reliablePing(tnet, wgData.ServerIP, pingTimeout, 5)
if err == nil && wgData.PublicKey != "" {
telemetry.ObserveTunnelLatency(context.Background(), "", wgData.PublicKey, "wireguard", lat.Seconds())
telemetry.ObserveTunnelLatency(context.Background(), wgData.PublicKey, "wireguard", lat.Seconds())
}
if err != nil {
logger.Warn("Initial reliable ping failed, but continuing: %v", err)
@@ -692,7 +692,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
connected = true
// telemetry: record a successful site registration (omit region unless available)
telemetry.IncSiteRegistration(context.Background(), id, "", "success")
telemetry.IncSiteRegistration(context.Background(), "success")
// add the targets if there are any
if len(wgData.Targets.TCP) > 0 {
@@ -728,7 +728,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
client.RegisterHandler("newt/wg/reconnect", func(msg websocket.WSMessage) {
logger.Info("Received reconnect message")
if wgData.PublicKey != "" {
telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request")
telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server_request")
}
// Close the WireGuard device and TUN
@@ -763,7 +763,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
client.RegisterHandler("newt/wg/terminate", func(msg websocket.WSMessage) {
logger.Info("Received termination message")
if wgData.PublicKey != "" {
telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request")
telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server_request")
}
// Close the WireGuard device and TUN

View File

@@ -295,7 +295,7 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien
connectionLost = true
logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures)
if tunnelID != "" {
telemetry.IncReconnect(context.Background(), "", tunnelID, telemetry.ReasonTimeout)
telemetry.IncReconnect(context.Background(), tunnelID, telemetry.ReasonTimeout)
}
stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second)
// Send registration message to the server for backward compatibility
@@ -325,7 +325,7 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien
recentLatencies = append(recentLatencies, latency)
// Record tunnel latency (limit sampling to this periodic check)
if tunnelID != "" {
telemetry.ObserveTunnelLatency(context.Background(), "", tunnelID, "wireguard", latency.Seconds())
telemetry.ObserveTunnelLatency(context.Background(), tunnelID, "wireguard", latency.Seconds())
}
if len(recentLatencies) > 10 {
recentLatencies = recentLatencies[1:]

View File

@@ -291,7 +291,7 @@ func (c *Client) getToken() (string, error) {
}
resp, err := client.Do(req)
if err != nil {
telemetry.IncConnError(context.Background(), c.config.ID, "auth", classifyConnError(err))
telemetry.IncConnError(context.Background(), "auth", classifyConnError(err))
return "", fmt.Errorf("failed to request new token: %w", err)
}
defer resp.Body.Close()
@@ -299,17 +299,17 @@ func (c *Client) getToken() (string, error) {
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body))
telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "failure")
telemetry.IncConnAttempt(context.Background(), "auth", "failure")
bin := "http_other"
if resp.StatusCode >= 500 {
bin = "http_5xx"
} else if resp.StatusCode >= 400 {
bin = "http_4xx"
}
telemetry.IncConnError(context.Background(), c.config.ID, "auth", bin)
telemetry.IncConnError(context.Background(), "auth", bin)
// Reconnect reason mapping for auth failures
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonAuthError)
telemetry.IncReconnect(context.Background(), c.config.ID, telemetry.ReasonAuthError)
}
return "", fmt.Errorf("failed to get token with status code: %d, body: %s", resp.StatusCode, string(body))
}
@@ -329,7 +329,7 @@ func (c *Client) getToken() (string, error) {
}
logger.Debug("Received token: %s", tokenResp.Data.Token)
telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "success")
telemetry.IncConnAttempt(context.Background(), "auth", "success")
return tokenResp.Data.Token, nil
}
@@ -379,8 +379,8 @@ func (c *Client) establishConnection() error {
if err != nil {
// telemetry: connection attempt failed before dialing
// site_id isn't globally available here; use client ID as site_id (low cardinality)
telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure")
telemetry.IncConnError(context.Background(), c.config.ID, "websocket", classifyConnError(err))
telemetry.IncConnAttempt(context.Background(), "websocket", "failure")
telemetry.IncConnError(context.Background(), "websocket", classifyConnError(err))
return fmt.Errorf("failed to get token: %w", err)
}
@@ -441,21 +441,21 @@ func (c *Client) establishConnection() error {
conn, _, err := dialer.DialContext(spanCtx, u.String(), nil)
if err != nil {
telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure")
telemetry.IncConnAttempt(context.Background(), "websocket", "failure")
etype := classifyConnError(err)
telemetry.IncConnError(context.Background(), c.config.ID, "websocket", etype)
telemetry.IncConnError(context.Background(), "websocket", etype)
// Map handshake-related errors to reconnect reasons where appropriate
if etype == "tls" {
telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonHandshakeError)
telemetry.IncReconnect(context.Background(), c.config.ID, telemetry.ReasonHandshakeError)
} else if etype == "timeout" {
telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonTimeout)
telemetry.IncReconnect(context.Background(), c.config.ID, telemetry.ReasonTimeout)
} else {
telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonError)
telemetry.IncReconnect(context.Background(), c.config.ID, telemetry.ReasonError)
}
return fmt.Errorf("failed to connect to WebSocket: %w", err)
}
telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "success")
telemetry.IncConnAttempt(context.Background(), "websocket", "success")
c.conn = conn
c.setConnected(true)

View File

@@ -306,7 +306,7 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) {
telemetry.IncConfigReload(context.Background(), "success")
// Optional reconnect reason mapping: config change
if s.serverPubKey != "" {
telemetry.IncReconnect(context.Background(), "", s.serverPubKey, telemetry.ReasonConfigChange)
telemetry.IncReconnect(context.Background(), s.serverPubKey, telemetry.ReasonConfigChange)
}
// Ensure the WireGuard interface and peers are configured

View File

@@ -246,7 +246,7 @@ func NewWireGuardService(interfaceName string, mtu int, generateAndSaveKeyTo str
// ReportRTT allows reporting native RTTs to telemetry, rate-limited externally.
func (s *WireGuardService) ReportRTT(seconds float64) {
if s.serverPubKey == "" { return }
telemetry.ObserveTunnelLatency(context.Background(), "", s.serverPubKey, "wireguard", seconds)
telemetry.ObserveTunnelLatency(context.Background(), s.serverPubKey, "wireguard", seconds)
}
func (s *WireGuardService) addTcpTarget(msg websocket.WSMessage) {