From 1e88fb86b40ec6c774a8f1a7e7f01ddad8787ffc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc=20Sch=C3=A4fer?= Date: Wed, 8 Oct 2025 00:09:17 +0200 Subject: [PATCH] feat(telemetry,metrics): allow site_id/region in attribute filter; read site_id from NEWT_SITE_ID/NEWT_ID or OTEL_RESOURCE_ATTRIBUTES; propagate site_id/region labels across metrics; include site labels in build_info; seed global site info --- internal/telemetry/metrics.go | 35 ++++++++----- internal/telemetry/telemetry.go | 89 +++++++++++++++++++++++++++++++-- 2 files changed, 108 insertions(+), 16 deletions(-) diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index bd163ca..2f4b005 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -46,6 +46,14 @@ var ( buildCommit string ) +// attrsWithSite appends global site/region labels when present. +func attrsWithSite(extra ...attribute.KeyValue) []attribute.KeyValue { + attrs := make([]attribute.KeyValue, 0, len(extra)+2) + attrs = append(attrs, extra...) + attrs = append(attrs, siteAttrs()...) + return attrs +} + func registerInstruments() error { var err error initOnce.Do(func() { @@ -124,6 +132,7 @@ func registerInstruments() error { if buildCommit != "" { attrs = append(attrs, attribute.String("commit", buildCommit)) } + attrs = append(attrs, siteAttrs()...) o.ObserveInt64(mBuildInfo, 1, metric.WithAttributes(attrs...)) return nil }, mBuildInfo) @@ -169,7 +178,9 @@ func RegisterBuildInfo(version, commit string) { // Config reloads func IncConfigReload(ctx context.Context, result string) { - mConfigReloads.Add(ctx, 1, metric.WithAttributes(attribute.String("result", result))) + mConfigReloads.Add(ctx, 1, metric.WithAttributes(attrsWithSite( + attribute.String("result", result), + )...)) } // Helpers for counters/histograms @@ -178,14 +189,14 @@ func IncSiteRegistration(ctx context.Context, result string) { attrs := []attribute.KeyValue{ attribute.String("result", result), } - mSiteRegistrations.Add(ctx, 1, metric.WithAttributes(attrs...)) + mSiteRegistrations.Add(ctx, 1, metric.WithAttributes(attrsWithSite(attrs...)...)) } func AddTunnelBytes(ctx context.Context, tunnelID, direction string, n int64) { - mTunnelBytes.Add(ctx, n, metric.WithAttributes( + mTunnelBytes.Add(ctx, n, metric.WithAttributes(attrsWithSite( attribute.String("tunnel_id", tunnelID), attribute.String("direction", direction), - )) + )...)) } // AddTunnelBytesSet adds bytes using a pre-built attribute.Set to avoid per-call allocations. @@ -194,29 +205,29 @@ func AddTunnelBytesSet(ctx context.Context, n int64, attrs attribute.Set) { } func ObserveTunnelLatency(ctx context.Context, tunnelID, transport string, seconds float64) { - mTunnelLatency.Record(ctx, seconds, metric.WithAttributes( + mTunnelLatency.Record(ctx, seconds, metric.WithAttributes(attrsWithSite( attribute.String("tunnel_id", tunnelID), attribute.String("transport", transport), - )) + )...)) } func IncReconnect(ctx context.Context, tunnelID, reason string) { - mReconnects.Add(ctx, 1, metric.WithAttributes( + mReconnects.Add(ctx, 1, metric.WithAttributes(attrsWithSite( attribute.String("tunnel_id", tunnelID), attribute.String("reason", reason), - )) + )...)) } func IncConnAttempt(ctx context.Context, transport, result string) { - mConnAttempts.Add(ctx, 1, metric.WithAttributes( + mConnAttempts.Add(ctx, 1, metric.WithAttributes(attrsWithSite( attribute.String("transport", transport), attribute.String("result", result), - )) + )...)) } func IncConnError(ctx context.Context, transport, typ string) { - mConnErrors.Add(ctx, 1, metric.WithAttributes( + mConnErrors.Add(ctx, 1, metric.WithAttributes(attrsWithSite( attribute.String("transport", transport), attribute.String("error_type", typ), - )) + )...)) } diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index d54e4d8..30efd46 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -6,6 +6,7 @@ import ( "net/http" "os" "strings" + "sync/atomic" "time" promclient "github.com/prometheus/client_golang/prometheus" @@ -63,16 +64,34 @@ type Config struct { // OTEL_SERVICE_VERSION (default: "") // NEWT_ADMIN_ADDR (default: ":2112") func FromEnv() Config { + // Prefer explicit NEWT_* env vars, then fall back to OTEL_RESOURCE_ATTRIBUTES + site := getenv("NEWT_SITE_ID", "") + if site == "" { + site = getenv("NEWT_ID", "") + } + region := os.Getenv("NEWT_REGION") + if site == "" || region == "" { + if ra := os.Getenv("OTEL_RESOURCE_ATTRIBUTES"); ra != "" { + m := parseResourceAttributes(ra) + if site == "" { + site = m["site_id"] + } + if region == "" { + region = m["region"] + } + } + } return Config{ ServiceName: getenv("OTEL_SERVICE_NAME", "newt"), ServiceVersion: os.Getenv("OTEL_SERVICE_VERSION"), - Region: os.Getenv("NEWT_REGION"), + SiteID: site, + Region: region, PromEnabled: getenv("NEWT_METRICS_PROMETHEUS_ENABLED", "true") == "true", OTLPEnabled: getenv("NEWT_METRICS_OTLP_ENABLED", "false") == "true", OTLPEndpoint: getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317"), OTLPInsecure: getenv("OTEL_EXPORTER_OTLP_INSECURE", "true") == "true", MetricExportInterval: getdur("OTEL_METRIC_EXPORT_INTERVAL", 15*time.Second), - AdminAddr: getenv("NEWT_ADMIN_ADDR", "127.0.0.1:2112"), + AdminAddr: getenv("NEWT_ADMIN_ADDR", "*********:2112"), } } @@ -110,6 +129,9 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) { resource.WithAttributes(attrs...), ) + // Seed global site/region for label propagation + UpdateSiteInfo(cfg.SiteID, cfg.Region) + s := &Setup{} // Build metric readers/exporters @@ -166,14 +188,14 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) { }, }, ))) - // Attribute whitelist: only allow expected low-cardinality keys on newt_* instruments. +// Attribute whitelist: only allow expected low-cardinality keys on newt_* instruments. mpOpts = append(mpOpts, sdkmetric.WithView(sdkmetric.NewView( sdkmetric.Instrument{Name: "newt_*"}, sdkmetric.Stream{ AttributeFilter: func(kv attribute.KeyValue) bool { k := string(kv.Key) switch k { - case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "error_type", "version", "commit": + case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "error_type", "version", "commit", "site_id", "region": return true default: return false @@ -253,6 +275,65 @@ func parseOTLPHeaders(h string) map[string]string { return m } +// parseResourceAttributes parses OTEL_RESOURCE_ATTRIBUTES formatted as k=v,k2=v2 +func parseResourceAttributes(s string) map[string]string { + m := map[string]string{} + if s == "" { + return m + } + parts := strings.Split(s, ",") + for _, p := range parts { + kv := strings.SplitN(strings.TrimSpace(p), "=", 2) + if len(kv) == 2 { + m[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1]) + } + } + return m +} + +// Global site/region used to enrich metric labels. +var siteIDVal atomic.Value +var regionVal atomic.Value + +// UpdateSiteInfo updates the global site_id and region used for metric labels. +func UpdateSiteInfo(siteID, region string) { + if siteID != "" { + siteIDVal.Store(siteID) + } + if region != "" { + regionVal.Store(region) + } +} + +func getSiteID() string { + if v, ok := siteIDVal.Load().(string); ok { + return v + } + return "" +} + +func getRegion() string { + if v, ok := regionVal.Load().(string); ok { + return v + } + return "" +} + +// siteAttrs returns label KVs for site_id and region (if set). +func siteAttrs() []attribute.KeyValue { + var out []attribute.KeyValue + if s := getSiteID(); s != "" { + out = append(out, attribute.String("site_id", s)) + } + if r := getRegion(); r != "" { + out = append(out, attribute.String("region", r)) + } + return out +} + +// SiteLabelKVs exposes site label KVs for other packages (e.g., proxy manager). +func SiteLabelKVs() []attribute.KeyValue { return siteAttrs() } + func getenv(k, d string) string { if v := os.Getenv(k); v != "" { return v