feat(telemetry,metrics): allow site_id/region in attribute filter; read site_id from NEWT_SITE_ID/NEWT_ID or OTEL_RESOURCE_ATTRIBUTES; propagate site_id/region labels across metrics; include site labels in build_info; seed global site info

This commit is contained in:
Marc Schäfer
2025-10-08 00:09:17 +02:00
parent 62407b0c74
commit 1e88fb86b4
2 changed files with 108 additions and 16 deletions

View File

@@ -46,6 +46,14 @@ var (
buildCommit string
)
// attrsWithSite appends global site/region labels when present.
func attrsWithSite(extra ...attribute.KeyValue) []attribute.KeyValue {
attrs := make([]attribute.KeyValue, 0, len(extra)+2)
attrs = append(attrs, extra...)
attrs = append(attrs, siteAttrs()...)
return attrs
}
func registerInstruments() error {
var err error
initOnce.Do(func() {
@@ -124,6 +132,7 @@ func registerInstruments() error {
if buildCommit != "" {
attrs = append(attrs, attribute.String("commit", buildCommit))
}
attrs = append(attrs, siteAttrs()...)
o.ObserveInt64(mBuildInfo, 1, metric.WithAttributes(attrs...))
return nil
}, mBuildInfo)
@@ -169,7 +178,9 @@ func RegisterBuildInfo(version, commit string) {
// Config reloads
func IncConfigReload(ctx context.Context, result string) {
mConfigReloads.Add(ctx, 1, metric.WithAttributes(attribute.String("result", result)))
mConfigReloads.Add(ctx, 1, metric.WithAttributes(attrsWithSite(
attribute.String("result", result),
)...))
}
// Helpers for counters/histograms
@@ -178,14 +189,14 @@ func IncSiteRegistration(ctx context.Context, result string) {
attrs := []attribute.KeyValue{
attribute.String("result", result),
}
mSiteRegistrations.Add(ctx, 1, metric.WithAttributes(attrs...))
mSiteRegistrations.Add(ctx, 1, metric.WithAttributes(attrsWithSite(attrs...)...))
}
func AddTunnelBytes(ctx context.Context, tunnelID, direction string, n int64) {
mTunnelBytes.Add(ctx, n, metric.WithAttributes(
mTunnelBytes.Add(ctx, n, metric.WithAttributes(attrsWithSite(
attribute.String("tunnel_id", tunnelID),
attribute.String("direction", direction),
))
)...))
}
// AddTunnelBytesSet adds bytes using a pre-built attribute.Set to avoid per-call allocations.
@@ -194,29 +205,29 @@ func AddTunnelBytesSet(ctx context.Context, n int64, attrs attribute.Set) {
}
func ObserveTunnelLatency(ctx context.Context, tunnelID, transport string, seconds float64) {
mTunnelLatency.Record(ctx, seconds, metric.WithAttributes(
mTunnelLatency.Record(ctx, seconds, metric.WithAttributes(attrsWithSite(
attribute.String("tunnel_id", tunnelID),
attribute.String("transport", transport),
))
)...))
}
func IncReconnect(ctx context.Context, tunnelID, reason string) {
mReconnects.Add(ctx, 1, metric.WithAttributes(
mReconnects.Add(ctx, 1, metric.WithAttributes(attrsWithSite(
attribute.String("tunnel_id", tunnelID),
attribute.String("reason", reason),
))
)...))
}
func IncConnAttempt(ctx context.Context, transport, result string) {
mConnAttempts.Add(ctx, 1, metric.WithAttributes(
mConnAttempts.Add(ctx, 1, metric.WithAttributes(attrsWithSite(
attribute.String("transport", transport),
attribute.String("result", result),
))
)...))
}
func IncConnError(ctx context.Context, transport, typ string) {
mConnErrors.Add(ctx, 1, metric.WithAttributes(
mConnErrors.Add(ctx, 1, metric.WithAttributes(attrsWithSite(
attribute.String("transport", transport),
attribute.String("error_type", typ),
))
)...))
}

View File

@@ -6,6 +6,7 @@ import (
"net/http"
"os"
"strings"
"sync/atomic"
"time"
promclient "github.com/prometheus/client_golang/prometheus"
@@ -63,16 +64,34 @@ type Config struct {
// OTEL_SERVICE_VERSION (default: "")
// NEWT_ADMIN_ADDR (default: ":2112")
func FromEnv() Config {
// Prefer explicit NEWT_* env vars, then fall back to OTEL_RESOURCE_ATTRIBUTES
site := getenv("NEWT_SITE_ID", "")
if site == "" {
site = getenv("NEWT_ID", "")
}
region := os.Getenv("NEWT_REGION")
if site == "" || region == "" {
if ra := os.Getenv("OTEL_RESOURCE_ATTRIBUTES"); ra != "" {
m := parseResourceAttributes(ra)
if site == "" {
site = m["site_id"]
}
if region == "" {
region = m["region"]
}
}
}
return Config{
ServiceName: getenv("OTEL_SERVICE_NAME", "newt"),
ServiceVersion: os.Getenv("OTEL_SERVICE_VERSION"),
Region: os.Getenv("NEWT_REGION"),
SiteID: site,
Region: region,
PromEnabled: getenv("NEWT_METRICS_PROMETHEUS_ENABLED", "true") == "true",
OTLPEnabled: getenv("NEWT_METRICS_OTLP_ENABLED", "false") == "true",
OTLPEndpoint: getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317"),
OTLPInsecure: getenv("OTEL_EXPORTER_OTLP_INSECURE", "true") == "true",
MetricExportInterval: getdur("OTEL_METRIC_EXPORT_INTERVAL", 15*time.Second),
AdminAddr: getenv("NEWT_ADMIN_ADDR", "127.0.0.1:2112"),
AdminAddr: getenv("NEWT_ADMIN_ADDR", "*********:2112"),
}
}
@@ -110,6 +129,9 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) {
resource.WithAttributes(attrs...),
)
// Seed global site/region for label propagation
UpdateSiteInfo(cfg.SiteID, cfg.Region)
s := &Setup{}
// Build metric readers/exporters
@@ -166,14 +188,14 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) {
},
},
)))
// Attribute whitelist: only allow expected low-cardinality keys on newt_* instruments.
// Attribute whitelist: only allow expected low-cardinality keys on newt_* instruments.
mpOpts = append(mpOpts, sdkmetric.WithView(sdkmetric.NewView(
sdkmetric.Instrument{Name: "newt_*"},
sdkmetric.Stream{
AttributeFilter: func(kv attribute.KeyValue) bool {
k := string(kv.Key)
switch k {
case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "error_type", "version", "commit":
case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "error_type", "version", "commit", "site_id", "region":
return true
default:
return false
@@ -253,6 +275,65 @@ func parseOTLPHeaders(h string) map[string]string {
return m
}
// parseResourceAttributes parses OTEL_RESOURCE_ATTRIBUTES formatted as k=v,k2=v2
func parseResourceAttributes(s string) map[string]string {
m := map[string]string{}
if s == "" {
return m
}
parts := strings.Split(s, ",")
for _, p := range parts {
kv := strings.SplitN(strings.TrimSpace(p), "=", 2)
if len(kv) == 2 {
m[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1])
}
}
return m
}
// Global site/region used to enrich metric labels.
var siteIDVal atomic.Value
var regionVal atomic.Value
// UpdateSiteInfo updates the global site_id and region used for metric labels.
func UpdateSiteInfo(siteID, region string) {
if siteID != "" {
siteIDVal.Store(siteID)
}
if region != "" {
regionVal.Store(region)
}
}
func getSiteID() string {
if v, ok := siteIDVal.Load().(string); ok {
return v
}
return ""
}
func getRegion() string {
if v, ok := regionVal.Load().(string); ok {
return v
}
return ""
}
// siteAttrs returns label KVs for site_id and region (if set).
func siteAttrs() []attribute.KeyValue {
var out []attribute.KeyValue
if s := getSiteID(); s != "" {
out = append(out, attribute.String("site_id", s))
}
if r := getRegion(); r != "" {
out = append(out, attribute.String("region", r))
}
return out
}
// SiteLabelKVs exposes site label KVs for other packages (e.g., proxy manager).
func SiteLabelKVs() []attribute.KeyValue { return siteAttrs() }
func getenv(k, d string) string {
if v := os.Getenv(k); v != "" {
return v