From ee2f8899ff545abba1f6e37fc9ed240f9311ff0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc=20Sch=C3=A4fer?= Date: Wed, 8 Oct 2025 01:06:13 +0200 Subject: [PATCH] refactor(telemetry): reduce cognitive complexity by splitting registerInstruments and Init; add unregister stoppers; extract state_view helpers --- internal/telemetry/metrics.go | 223 ++++++++++++++++--------------- internal/telemetry/state_view.go | 60 +++++---- internal/telemetry/telemetry.go | 184 ++++++++++--------------- 3 files changed, 224 insertions(+), 243 deletions(-) diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index 2571379..a8cecd3 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -70,117 +70,118 @@ func registerInstruments() error { var err error initOnce.Do(func() { meter = otel.Meter("newt") - - // Site / Registration - mSiteRegistrations, err = meter.Int64Counter("newt_site_registrations_total", - metric.WithDescription("Total site registration attempts")) - if err != nil { - return - } - mSiteOnline, err = meter.Int64ObservableGauge("newt_site_online", - metric.WithDescription("Site online (0/1)")) - if err != nil { - return - } - mSiteLastHeartbeat, err = meter.Float64ObservableGauge("newt_site_last_heartbeat_seconds", - metric.WithDescription("Seconds since last site heartbeat")) - if err != nil { - return - } - - // Tunnel / Sessions - mTunnelSessions, err = meter.Int64ObservableGauge("newt_tunnel_sessions", - metric.WithDescription("Active tunnel sessions")) - if err != nil { - return - } - mTunnelBytes, err = meter.Int64Counter("newt_tunnel_bytes_total", - metric.WithDescription("Tunnel bytes ingress/egress"), - metric.WithUnit("By")) - if err != nil { - return - } - mTunnelLatency, err = meter.Float64Histogram("newt_tunnel_latency_seconds", - metric.WithDescription("Per-tunnel latency in seconds"), - metric.WithUnit("s")) - if err != nil { - return - } - mReconnects, err = meter.Int64Counter("newt_tunnel_reconnects_total", - metric.WithDescription("Tunnel reconnect events")) - if err != nil { - return - } - - // Connection / NAT - mConnAttempts, err = meter.Int64Counter("newt_connection_attempts_total", - metric.WithDescription("Connection attempts")) - if err != nil { - return - } - mConnErrors, err = meter.Int64Counter("newt_connection_errors_total", - metric.WithDescription("Connection errors by type")) - if err != nil { - return - } - - // Config/Restart - mConfigReloads, _ = meter.Int64Counter("newt_config_reloads_total", - metric.WithDescription("Configuration reloads")) - mRestartCount, _ = meter.Int64Counter("newt_restart_count_total", - metric.WithDescription("Process restart count (incremented on start)")) - mConfigApply, _ = meter.Float64Histogram("newt_config_apply_seconds", - metric.WithDescription("Configuration apply duration in seconds"), - metric.WithUnit("s")) - mCertRotationTotal, _ = meter.Int64Counter("newt_cert_rotation_total", - metric.WithDescription("Certificate rotation events (success/failure)")) - - // Build info gauge (value 1 with version/commit attributes) - mBuildInfo, _ = meter.Int64ObservableGauge("newt_build_info", - metric.WithDescription("Newt build information (value is always 1)")) - - // WebSocket - mWSConnectLatency, _ = meter.Float64Histogram("newt_websocket_connect_latency_seconds", - metric.WithDescription("WebSocket connect latency in seconds"), - metric.WithUnit("s")) - mWSMessages, _ = meter.Int64Counter("newt_websocket_messages_total", - metric.WithDescription("WebSocket messages by direction and type")) - - // Proxy - mProxyActiveConns, _ = meter.Int64ObservableGauge("newt_proxy_active_connections", - metric.WithDescription("Proxy active connections per tunnel and protocol")) - mProxyBufferBytes, _ = meter.Int64ObservableGauge("newt_proxy_buffer_bytes", - metric.WithDescription("Proxy buffer bytes (may approximate async backlog)"), - metric.WithUnit("By")) - mProxyAsyncBacklogByte, _ = meter.Int64ObservableGauge("newt_proxy_async_backlog_bytes", - metric.WithDescription("Unflushed async byte backlog per tunnel and protocol"), - metric.WithUnit("By")) - mProxyDropsTotal, _ = meter.Int64Counter("newt_proxy_drops_total", - metric.WithDescription("Proxy drops due to write errors")) - - // Register a default callback for build info if version/commit set - if _, e := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { - if buildVersion == "" && buildCommit == "" { - return nil - } - attrs := []attribute.KeyValue{} - if buildVersion != "" { - attrs = append(attrs, attribute.String("version", buildVersion)) - } - if buildCommit != "" { - attrs = append(attrs, attribute.String("commit", buildCommit)) - } - attrs = append(attrs, siteAttrs()...) - o.ObserveInt64(mBuildInfo, 1, metric.WithAttributes(attrs...)) - return nil - }, mBuildInfo); e != nil { - // forward to global OTel error handler; Init will continue but build_info will be missing - otel.Handle(e) - } + if e := registerSiteInstruments(); e != nil { err = e; return } + if e := registerTunnelInstruments(); e != nil { err = e; return } + if e := registerConnInstruments(); e != nil { err = e; return } + if e := registerConfigInstruments(); e != nil { err = e; return } + if e := registerBuildWSProxyInstruments(); e != nil { err = e; return } }) return err } +func registerSiteInstruments() error { + var err error + mSiteRegistrations, err = meter.Int64Counter("newt_site_registrations_total", + metric.WithDescription("Total site registration attempts")) + if err != nil { return err } + mSiteOnline, err = meter.Int64ObservableGauge("newt_site_online", + metric.WithDescription("Site online (0/1)")) + if err != nil { return err } + mSiteLastHeartbeat, err = meter.Float64ObservableGauge("newt_site_last_heartbeat_seconds", + metric.WithDescription("Seconds since last site heartbeat")) + if err != nil { return err } + return nil +} + +func registerTunnelInstruments() error { + var err error + mTunnelSessions, err = meter.Int64ObservableGauge("newt_tunnel_sessions", + metric.WithDescription("Active tunnel sessions")) + if err != nil { return err } + mTunnelBytes, err = meter.Int64Counter("newt_tunnel_bytes_total", + metric.WithDescription("Tunnel bytes ingress/egress"), + metric.WithUnit("By")) + if err != nil { return err } + mTunnelLatency, err = meter.Float64Histogram("newt_tunnel_latency_seconds", + metric.WithDescription("Per-tunnel latency in seconds"), + metric.WithUnit("s")) + if err != nil { return err } + mReconnects, err = meter.Int64Counter("newt_tunnel_reconnects_total", + metric.WithDescription("Tunnel reconnect events")) + if err != nil { return err } + return nil +} + +func registerConnInstruments() error { + var err error + mConnAttempts, err = meter.Int64Counter("newt_connection_attempts_total", + metric.WithDescription("Connection attempts")) + if err != nil { return err } + mConnErrors, err = meter.Int64Counter("newt_connection_errors_total", + metric.WithDescription("Connection errors by type")) + if err != nil { return err } + return nil +} + +func registerConfigInstruments() error { + mConfigReloads, _ = meter.Int64Counter("newt_config_reloads_total", + metric.WithDescription("Configuration reloads")) + mRestartCount, _ = meter.Int64Counter("newt_restart_count_total", + metric.WithDescription("Process restart count (incremented on start)")) + mConfigApply, _ = meter.Float64Histogram("newt_config_apply_seconds", + metric.WithDescription("Configuration apply duration in seconds"), + metric.WithUnit("s")) + mCertRotationTotal, _ = meter.Int64Counter("newt_cert_rotation_total", + metric.WithDescription("Certificate rotation events (success/failure)")) + return nil +} + +func registerBuildWSProxyInstruments() error { + // Build info gauge (value 1 with version/commit attributes) + mBuildInfo, _ = meter.Int64ObservableGauge("newt_build_info", + metric.WithDescription("Newt build information (value is always 1)")) + // WebSocket + mWSConnectLatency, _ = meter.Float64Histogram("newt_websocket_connect_latency_seconds", + metric.WithDescription("WebSocket connect latency in seconds"), + metric.WithUnit("s")) + mWSMessages, _ = meter.Int64Counter("newt_websocket_messages_total", + metric.WithDescription("WebSocket messages by direction and type")) + // Proxy + mProxyActiveConns, _ = meter.Int64ObservableGauge("newt_proxy_active_connections", + metric.WithDescription("Proxy active connections per tunnel and protocol")) + mProxyBufferBytes, _ = meter.Int64ObservableGauge("newt_proxy_buffer_bytes", + metric.WithDescription("Proxy buffer bytes (may approximate async backlog)"), + metric.WithUnit("By")) + mProxyAsyncBacklogByte, _ = meter.Int64ObservableGauge("newt_proxy_async_backlog_bytes", + metric.WithDescription("Unflushed async byte backlog per tunnel and protocol"), + metric.WithUnit("By")) + mProxyDropsTotal, _ = meter.Int64Counter("newt_proxy_drops_total", + metric.WithDescription("Proxy drops due to write errors")) + // Register a default callback for build info if version/commit set + reg, e := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { + if buildVersion == "" && buildCommit == "" { + return nil + } + attrs := []attribute.KeyValue{} + if buildVersion != "" { + attrs = append(attrs, attribute.String("version", buildVersion)) + } + if buildCommit != "" { + attrs = append(attrs, attribute.String("commit", buildCommit)) + } + attrs = append(attrs, siteAttrs()...) + o.ObserveInt64(mBuildInfo, 1, metric.WithAttributes(attrs...)) + return nil + }, mBuildInfo) + if e != nil { + otel.Handle(e) + } else { + // Provide a functional stopper that unregisters the callback + obsStopper = func() { _ = reg.Unregister(context.Background()) } + } + return nil +} + // Observable registration: Newt can register a callback to report gauges. // Call SetObservableCallback once to start observing online status, last // heartbeat seconds, and active sessions. @@ -216,10 +217,14 @@ func SetObservableCallback(cb func(context.Context, metric.Observer) error) { // SetProxyObservableCallback registers a callback to observe proxy gauges. func SetProxyObservableCallback(cb func(context.Context, metric.Observer) error) { proxyObsOnce.Do(func() { - if _, e := meter.RegisterCallback(cb, mProxyActiveConns, mProxyBufferBytes, mProxyAsyncBacklogByte); e != nil { + reg, e := meter.RegisterCallback(cb, mProxyActiveConns, mProxyBufferBytes, mProxyAsyncBacklogByte) + if e != nil { otel.Handle(e) + proxyStopper = func() {} + return } - proxyStopper = func() {} + // Provide a functional stopper to unregister later if needed + proxyStopper = func() { _ = reg.Unregister(context.Background()) } }) } diff --git a/internal/telemetry/state_view.go b/internal/telemetry/state_view.go index 8bb22e4..275217c 100644 --- a/internal/telemetry/state_view.go +++ b/internal/telemetry/state_view.go @@ -37,30 +37,9 @@ func RegisterStateView(v StateView) { if any := stateView.Load(); any != nil { if sv, ok := any.(StateView); ok { for _, siteID := range sv.ListSites() { - if online, ok := sv.Online(siteID); ok { - val := int64(0) - if online { - val = 1 - } - o.ObserveInt64(mSiteOnline, val, metric.WithAttributes( - attribute.String("site_id", getSiteID()), - )) - } - if t, ok := sv.LastHeartbeat(siteID); ok { - secs := time.Since(t).Seconds() - o.ObserveFloat64(mSiteLastHeartbeat, secs, metric.WithAttributes( - attribute.String("site_id", getSiteID()), - )) - } - // If the view supports per-tunnel sessions, report them labeled by tunnel_id. - if tm, ok := any.(interface{ SessionsByTunnel() map[string]int64 }); ok { - for tid, n := range tm.SessionsByTunnel() { - o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes( - attribute.String("site_id", getSiteID()), - attribute.String("tunnel_id", tid), - )) - } - } + observeSiteOnlineFor(o, sv, siteID) + observeLastHeartbeatFor(o, sv, siteID) + observeSessionsFor(o, any) } } } @@ -68,3 +47,36 @@ func RegisterStateView(v StateView) { }) } } + +func observeSiteOnlineFor(o metric.Observer, sv StateView, siteID string) { + if online, ok := sv.Online(siteID); ok { + val := int64(0) + if online { val = 1 } + o.ObserveInt64(mSiteOnline, val, metric.WithAttributes( + attribute.String("site_id", getSiteID()), + )) + } +} + +func observeLastHeartbeatFor(o metric.Observer, sv StateView, siteID string) { + if t, ok := sv.LastHeartbeat(siteID); ok { + secs := time.Since(t).Seconds() + o.ObserveFloat64(mSiteLastHeartbeat, secs, metric.WithAttributes( + attribute.String("site_id", getSiteID()), + )) + } +} + +func observeSessionsFor(o metric.Observer, any interface{}) { + if tm, ok := any.(interface{ SessionsByTunnel() map[string]int64 }); ok { + for tid, n := range tm.SessionsByTunnel() { + attrs := []attribute.KeyValue{ + attribute.String("site_id", getSiteID()), + } + if ShouldIncludeTunnelID() && tid != "" { + attrs = append(attrs, attribute.String("tunnel_id", tid)) + } + o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes(attrs...)) + } + } +} diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 8eb0927..c064f03 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -118,143 +118,107 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) { } else { includeTunnelIDVal.Store(false) } - // Build resource with required attributes and only include optional ones when non-empty + res := buildResource(ctx, cfg) + UpdateSiteInfo(cfg.SiteID, cfg.Region) + + s := &Setup{} + readers, promHandler, shutdowns, err := setupMetricExport(ctx, cfg, res) + if err != nil { return nil, err } + s.PrometheusHandler = promHandler + // Build provider + mp := buildMeterProvider(res, readers) + otel.SetMeterProvider(mp) + s.MeterProvider = mp + s.shutdowns = append(s.shutdowns, mp.Shutdown) + // Optional tracing + if cfg.OTLPEnabled { + if tp, exp := setupTracing(ctx, cfg, res); tp != nil { + otel.SetTracerProvider(tp) + s.TracerProvider = tp + s.shutdowns = append(s.shutdowns, func(c context.Context) error { + return errors.Join(exp.Shutdown(c), tp.Shutdown(c)) + }) + } + } + // Add metric exporter shutdowns + s.shutdowns = append(s.shutdowns, shutdowns...) + // Runtime metrics + _ = runtime.Start(runtime.WithMeterProvider(mp)) + // Instruments + if err := registerInstruments(); err != nil { return nil, err } + if cfg.BuildVersion != "" || cfg.BuildCommit != "" { RegisterBuildInfo(cfg.BuildVersion, cfg.BuildCommit) } + return s, nil +} + +func buildResource(ctx context.Context, cfg Config) *resource.Resource { attrs := []attribute.KeyValue{ semconv.ServiceName(cfg.ServiceName), semconv.ServiceVersion(cfg.ServiceVersion), } - if cfg.SiteID != "" { - attrs = append(attrs, attribute.String("site_id", cfg.SiteID)) - } - if cfg.Region != "" { - attrs = append(attrs, attribute.String("region", cfg.Region)) - } - res, _ := resource.New(ctx, - resource.WithFromEnv(), - resource.WithHost(), - resource.WithAttributes(attrs...), - ) + if cfg.SiteID != "" { attrs = append(attrs, attribute.String("site_id", cfg.SiteID)) } + if cfg.Region != "" { attrs = append(attrs, attribute.String("region", cfg.Region)) } + res, _ := resource.New(ctx, resource.WithFromEnv(), resource.WithHost(), resource.WithAttributes(attrs...)) + return res +} - // Seed global site/region for label propagation - UpdateSiteInfo(cfg.SiteID, cfg.Region) - - s := &Setup{} - - // Build metric readers/exporters +func setupMetricExport(ctx context.Context, cfg Config, res *resource.Resource) ([]sdkmetric.Reader, http.Handler, []func(context.Context) error, error) { var readers []sdkmetric.Reader - - // Prometheus exporter exposes a native /metrics handler for scraping + var shutdowns []func(context.Context) error + var promHandler http.Handler if cfg.PromEnabled { reg := promclient.NewRegistry() exp, err := prometheus.New(prometheus.WithRegisterer(reg)) - if err != nil { - return nil, err - } + if err != nil { return nil, nil, nil, err } readers = append(readers, exp) - s.PrometheusHandler = promhttp.HandlerFor(reg, promhttp.HandlerOpts{}) + promHandler = promhttp.HandlerFor(reg, promhttp.HandlerOpts{}) } - - // Optional OTLP metric exporter (gRPC) if cfg.OTLPEnabled { mopts := []otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpoint(cfg.OTLPEndpoint)} - // Headers support via OTEL_EXPORTER_OTLP_HEADERS (k=v,k2=v2) - if hdrs := parseOTLPHeaders(os.Getenv("OTEL_EXPORTER_OTLP_HEADERS")); len(hdrs) > 0 { - mopts = append(mopts, otlpmetricgrpc.WithHeaders(hdrs)) - } - if cfg.OTLPInsecure { - mopts = append(mopts, otlpmetricgrpc.WithInsecure()) - } else if certFile := os.Getenv("OTEL_EXPORTER_OTLP_CERTIFICATE"); certFile != "" { - creds, cerr := credentials.NewClientTLSFromFile(certFile, "") - if cerr == nil { - mopts = append(mopts, otlpmetricgrpc.WithTLSCredentials(creds)) - } + if hdrs := parseOTLPHeaders(os.Getenv("OTEL_EXPORTER_OTLP_HEADERS")); len(hdrs) > 0 { mopts = append(mopts, otlpmetricgrpc.WithHeaders(hdrs)) } + if cfg.OTLPInsecure { mopts = append(mopts, otlpmetricgrpc.WithInsecure()) } else if certFile := os.Getenv("OTEL_EXPORTER_OTLP_CERTIFICATE"); certFile != "" { + if creds, cerr := credentials.NewClientTLSFromFile(certFile, ""); cerr == nil { mopts = append(mopts, otlpmetricgrpc.WithTLSCredentials(creds)) } } mexp, err := otlpmetricgrpc.New(ctx, mopts...) - if err != nil { - return nil, err - } + if err != nil { return nil, nil, nil, err } readers = append(readers, sdkmetric.NewPeriodicReader(mexp, sdkmetric.WithInterval(cfg.MetricExportInterval))) - s.shutdowns = append(s.shutdowns, mexp.Shutdown) + shutdowns = append(shutdowns, mexp.Shutdown) } + return readers, promHandler, shutdowns, nil +} - // Build provider options iteratively (WithReader is not variadic) +func buildMeterProvider(res *resource.Resource, readers []sdkmetric.Reader) *sdkmetric.MeterProvider { var mpOpts []sdkmetric.Option mpOpts = append(mpOpts, sdkmetric.WithResource(res)) - for _, r := range readers { - mpOpts = append(mpOpts, sdkmetric.WithReader(r)) - } - // Default view for latency histograms in seconds. + for _, r := range readers { mpOpts = append(mpOpts, sdkmetric.WithReader(r)) } mpOpts = append(mpOpts, sdkmetric.WithView(sdkmetric.NewView( - sdkmetric.Instrument{ - Name: "newt_*_latency_seconds", - }, - sdkmetric.Stream{ - Aggregation: sdkmetric.AggregationExplicitBucketHistogram{ - Boundaries: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30}, - }, - }, + sdkmetric.Instrument{Name: "newt_*_latency_seconds"}, + sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{Boundaries: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30}}}, ))) -// Attribute whitelist: only allow expected low-cardinality keys on newt_* instruments. mpOpts = append(mpOpts, sdkmetric.WithView(sdkmetric.NewView( sdkmetric.Instrument{Name: "newt_*"}, - sdkmetric.Stream{ - AttributeFilter: func(kv attribute.KeyValue) bool { - k := string(kv.Key) - switch k { - case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "initiator", "error_type", "msg_type", "phase", "version", "commit", "site_id", "region": - return true - default: - return false - } - }, - }, - ))) - mp := sdkmetric.NewMeterProvider(mpOpts...) - otel.SetMeterProvider(mp) - s.MeterProvider = mp - s.shutdowns = append(s.shutdowns, mp.Shutdown) - - // Optional tracing (OTLP over gRPC) - if cfg.OTLPEnabled { - topts := []otlptracegrpc.Option{otlptracegrpc.WithEndpoint(cfg.OTLPEndpoint)} - if hdrs := parseOTLPHeaders(os.Getenv("OTEL_EXPORTER_OTLP_HEADERS")); len(hdrs) > 0 { - topts = append(topts, otlptracegrpc.WithHeaders(hdrs)) - } - if cfg.OTLPInsecure { - topts = append(topts, otlptracegrpc.WithInsecure()) - } else if certFile := os.Getenv("OTEL_EXPORTER_OTLP_CERTIFICATE"); certFile != "" { - creds, cerr := credentials.NewClientTLSFromFile(certFile, "") - if cerr == nil { - topts = append(topts, otlptracegrpc.WithTLSCredentials(creds)) + sdkmetric.Stream{AttributeFilter: func(kv attribute.KeyValue) bool { + k := string(kv.Key) + switch k { + case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "initiator", "error_type", "msg_type", "phase", "version", "commit", "site_id", "region": + return true + default: + return false } - } - exp, err := otlptracegrpc.New(ctx, topts...) - if err == nil { - tp := sdktrace.NewTracerProvider( - sdktrace.WithBatcher(exp), - sdktrace.WithResource(res), - ) - otel.SetTracerProvider(tp) - s.TracerProvider = tp - s.shutdowns = append(s.shutdowns, func(ctx context.Context) error { - return errors.Join(exp.Shutdown(ctx), tp.Shutdown(ctx)) - }) - } - } + }}, + ))) + return sdkmetric.NewMeterProvider(mpOpts...) +} - // Export Go runtime metrics (goroutines, GC, mem, etc.) - _ = runtime.Start(runtime.WithMeterProvider(mp)) - - // Register instruments after provider is set - if err := registerInstruments(); err != nil { - return nil, err +func setupTracing(ctx context.Context, cfg Config, res *resource.Resource) (*sdktrace.TracerProvider, *otlptracegrpc.Exporter) { + topts := []otlptracegrpc.Option{otlptracegrpc.WithEndpoint(cfg.OTLPEndpoint)} + if hdrs := parseOTLPHeaders(os.Getenv("OTEL_EXPORTER_OTLP_HEADERS")); len(hdrs) > 0 { topts = append(topts, otlptracegrpc.WithHeaders(hdrs)) } + if cfg.OTLPInsecure { topts = append(topts, otlptracegrpc.WithInsecure()) } else if certFile := os.Getenv("OTEL_EXPORTER_OTLP_CERTIFICATE"); certFile != "" { + if creds, cerr := credentials.NewClientTLSFromFile(certFile, ""); cerr == nil { topts = append(topts, otlptracegrpc.WithTLSCredentials(creds)) } } - // Optional build info metric - if cfg.BuildVersion != "" || cfg.BuildCommit != "" { - RegisterBuildInfo(cfg.BuildVersion, cfg.BuildCommit) - } - - return s, nil + exp, err := otlptracegrpc.New(ctx, topts...) + if err != nil { return nil, nil } + tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exp), sdktrace.WithResource(res)) + return tp, exp } // Shutdown flushes exporters and providers in reverse init order.