refactor(telemetry): reduce cognitive complexity by splitting registerInstruments and Init; add unregister stoppers; extract state_view helpers

This commit is contained in:
Marc Schäfer
2025-10-08 01:06:13 +02:00
parent 744a741556
commit ee2f8899ff
3 changed files with 224 additions and 243 deletions

View File

@@ -70,117 +70,118 @@ func registerInstruments() error {
var err error
initOnce.Do(func() {
meter = otel.Meter("newt")
// Site / Registration
mSiteRegistrations, err = meter.Int64Counter("newt_site_registrations_total",
metric.WithDescription("Total site registration attempts"))
if err != nil {
return
}
mSiteOnline, err = meter.Int64ObservableGauge("newt_site_online",
metric.WithDescription("Site online (0/1)"))
if err != nil {
return
}
mSiteLastHeartbeat, err = meter.Float64ObservableGauge("newt_site_last_heartbeat_seconds",
metric.WithDescription("Seconds since last site heartbeat"))
if err != nil {
return
}
// Tunnel / Sessions
mTunnelSessions, err = meter.Int64ObservableGauge("newt_tunnel_sessions",
metric.WithDescription("Active tunnel sessions"))
if err != nil {
return
}
mTunnelBytes, err = meter.Int64Counter("newt_tunnel_bytes_total",
metric.WithDescription("Tunnel bytes ingress/egress"),
metric.WithUnit("By"))
if err != nil {
return
}
mTunnelLatency, err = meter.Float64Histogram("newt_tunnel_latency_seconds",
metric.WithDescription("Per-tunnel latency in seconds"),
metric.WithUnit("s"))
if err != nil {
return
}
mReconnects, err = meter.Int64Counter("newt_tunnel_reconnects_total",
metric.WithDescription("Tunnel reconnect events"))
if err != nil {
return
}
// Connection / NAT
mConnAttempts, err = meter.Int64Counter("newt_connection_attempts_total",
metric.WithDescription("Connection attempts"))
if err != nil {
return
}
mConnErrors, err = meter.Int64Counter("newt_connection_errors_total",
metric.WithDescription("Connection errors by type"))
if err != nil {
return
}
// Config/Restart
mConfigReloads, _ = meter.Int64Counter("newt_config_reloads_total",
metric.WithDescription("Configuration reloads"))
mRestartCount, _ = meter.Int64Counter("newt_restart_count_total",
metric.WithDescription("Process restart count (incremented on start)"))
mConfigApply, _ = meter.Float64Histogram("newt_config_apply_seconds",
metric.WithDescription("Configuration apply duration in seconds"),
metric.WithUnit("s"))
mCertRotationTotal, _ = meter.Int64Counter("newt_cert_rotation_total",
metric.WithDescription("Certificate rotation events (success/failure)"))
// Build info gauge (value 1 with version/commit attributes)
mBuildInfo, _ = meter.Int64ObservableGauge("newt_build_info",
metric.WithDescription("Newt build information (value is always 1)"))
// WebSocket
mWSConnectLatency, _ = meter.Float64Histogram("newt_websocket_connect_latency_seconds",
metric.WithDescription("WebSocket connect latency in seconds"),
metric.WithUnit("s"))
mWSMessages, _ = meter.Int64Counter("newt_websocket_messages_total",
metric.WithDescription("WebSocket messages by direction and type"))
// Proxy
mProxyActiveConns, _ = meter.Int64ObservableGauge("newt_proxy_active_connections",
metric.WithDescription("Proxy active connections per tunnel and protocol"))
mProxyBufferBytes, _ = meter.Int64ObservableGauge("newt_proxy_buffer_bytes",
metric.WithDescription("Proxy buffer bytes (may approximate async backlog)"),
metric.WithUnit("By"))
mProxyAsyncBacklogByte, _ = meter.Int64ObservableGauge("newt_proxy_async_backlog_bytes",
metric.WithDescription("Unflushed async byte backlog per tunnel and protocol"),
metric.WithUnit("By"))
mProxyDropsTotal, _ = meter.Int64Counter("newt_proxy_drops_total",
metric.WithDescription("Proxy drops due to write errors"))
// Register a default callback for build info if version/commit set
if _, e := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
if buildVersion == "" && buildCommit == "" {
return nil
}
attrs := []attribute.KeyValue{}
if buildVersion != "" {
attrs = append(attrs, attribute.String("version", buildVersion))
}
if buildCommit != "" {
attrs = append(attrs, attribute.String("commit", buildCommit))
}
attrs = append(attrs, siteAttrs()...)
o.ObserveInt64(mBuildInfo, 1, metric.WithAttributes(attrs...))
return nil
}, mBuildInfo); e != nil {
// forward to global OTel error handler; Init will continue but build_info will be missing
otel.Handle(e)
}
if e := registerSiteInstruments(); e != nil { err = e; return }
if e := registerTunnelInstruments(); e != nil { err = e; return }
if e := registerConnInstruments(); e != nil { err = e; return }
if e := registerConfigInstruments(); e != nil { err = e; return }
if e := registerBuildWSProxyInstruments(); e != nil { err = e; return }
})
return err
}
func registerSiteInstruments() error {
var err error
mSiteRegistrations, err = meter.Int64Counter("newt_site_registrations_total",
metric.WithDescription("Total site registration attempts"))
if err != nil { return err }
mSiteOnline, err = meter.Int64ObservableGauge("newt_site_online",
metric.WithDescription("Site online (0/1)"))
if err != nil { return err }
mSiteLastHeartbeat, err = meter.Float64ObservableGauge("newt_site_last_heartbeat_seconds",
metric.WithDescription("Seconds since last site heartbeat"))
if err != nil { return err }
return nil
}
func registerTunnelInstruments() error {
var err error
mTunnelSessions, err = meter.Int64ObservableGauge("newt_tunnel_sessions",
metric.WithDescription("Active tunnel sessions"))
if err != nil { return err }
mTunnelBytes, err = meter.Int64Counter("newt_tunnel_bytes_total",
metric.WithDescription("Tunnel bytes ingress/egress"),
metric.WithUnit("By"))
if err != nil { return err }
mTunnelLatency, err = meter.Float64Histogram("newt_tunnel_latency_seconds",
metric.WithDescription("Per-tunnel latency in seconds"),
metric.WithUnit("s"))
if err != nil { return err }
mReconnects, err = meter.Int64Counter("newt_tunnel_reconnects_total",
metric.WithDescription("Tunnel reconnect events"))
if err != nil { return err }
return nil
}
func registerConnInstruments() error {
var err error
mConnAttempts, err = meter.Int64Counter("newt_connection_attempts_total",
metric.WithDescription("Connection attempts"))
if err != nil { return err }
mConnErrors, err = meter.Int64Counter("newt_connection_errors_total",
metric.WithDescription("Connection errors by type"))
if err != nil { return err }
return nil
}
func registerConfigInstruments() error {
mConfigReloads, _ = meter.Int64Counter("newt_config_reloads_total",
metric.WithDescription("Configuration reloads"))
mRestartCount, _ = meter.Int64Counter("newt_restart_count_total",
metric.WithDescription("Process restart count (incremented on start)"))
mConfigApply, _ = meter.Float64Histogram("newt_config_apply_seconds",
metric.WithDescription("Configuration apply duration in seconds"),
metric.WithUnit("s"))
mCertRotationTotal, _ = meter.Int64Counter("newt_cert_rotation_total",
metric.WithDescription("Certificate rotation events (success/failure)"))
return nil
}
func registerBuildWSProxyInstruments() error {
// Build info gauge (value 1 with version/commit attributes)
mBuildInfo, _ = meter.Int64ObservableGauge("newt_build_info",
metric.WithDescription("Newt build information (value is always 1)"))
// WebSocket
mWSConnectLatency, _ = meter.Float64Histogram("newt_websocket_connect_latency_seconds",
metric.WithDescription("WebSocket connect latency in seconds"),
metric.WithUnit("s"))
mWSMessages, _ = meter.Int64Counter("newt_websocket_messages_total",
metric.WithDescription("WebSocket messages by direction and type"))
// Proxy
mProxyActiveConns, _ = meter.Int64ObservableGauge("newt_proxy_active_connections",
metric.WithDescription("Proxy active connections per tunnel and protocol"))
mProxyBufferBytes, _ = meter.Int64ObservableGauge("newt_proxy_buffer_bytes",
metric.WithDescription("Proxy buffer bytes (may approximate async backlog)"),
metric.WithUnit("By"))
mProxyAsyncBacklogByte, _ = meter.Int64ObservableGauge("newt_proxy_async_backlog_bytes",
metric.WithDescription("Unflushed async byte backlog per tunnel and protocol"),
metric.WithUnit("By"))
mProxyDropsTotal, _ = meter.Int64Counter("newt_proxy_drops_total",
metric.WithDescription("Proxy drops due to write errors"))
// Register a default callback for build info if version/commit set
reg, e := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
if buildVersion == "" && buildCommit == "" {
return nil
}
attrs := []attribute.KeyValue{}
if buildVersion != "" {
attrs = append(attrs, attribute.String("version", buildVersion))
}
if buildCommit != "" {
attrs = append(attrs, attribute.String("commit", buildCommit))
}
attrs = append(attrs, siteAttrs()...)
o.ObserveInt64(mBuildInfo, 1, metric.WithAttributes(attrs...))
return nil
}, mBuildInfo)
if e != nil {
otel.Handle(e)
} else {
// Provide a functional stopper that unregisters the callback
obsStopper = func() { _ = reg.Unregister(context.Background()) }
}
return nil
}
// Observable registration: Newt can register a callback to report gauges.
// Call SetObservableCallback once to start observing online status, last
// heartbeat seconds, and active sessions.
@@ -216,10 +217,14 @@ func SetObservableCallback(cb func(context.Context, metric.Observer) error) {
// SetProxyObservableCallback registers a callback to observe proxy gauges.
func SetProxyObservableCallback(cb func(context.Context, metric.Observer) error) {
proxyObsOnce.Do(func() {
if _, e := meter.RegisterCallback(cb, mProxyActiveConns, mProxyBufferBytes, mProxyAsyncBacklogByte); e != nil {
reg, e := meter.RegisterCallback(cb, mProxyActiveConns, mProxyBufferBytes, mProxyAsyncBacklogByte)
if e != nil {
otel.Handle(e)
proxyStopper = func() {}
return
}
proxyStopper = func() {}
// Provide a functional stopper to unregister later if needed
proxyStopper = func() { _ = reg.Unregister(context.Background()) }
})
}

View File

@@ -37,30 +37,9 @@ func RegisterStateView(v StateView) {
if any := stateView.Load(); any != nil {
if sv, ok := any.(StateView); ok {
for _, siteID := range sv.ListSites() {
if online, ok := sv.Online(siteID); ok {
val := int64(0)
if online {
val = 1
}
o.ObserveInt64(mSiteOnline, val, metric.WithAttributes(
attribute.String("site_id", getSiteID()),
))
}
if t, ok := sv.LastHeartbeat(siteID); ok {
secs := time.Since(t).Seconds()
o.ObserveFloat64(mSiteLastHeartbeat, secs, metric.WithAttributes(
attribute.String("site_id", getSiteID()),
))
}
// If the view supports per-tunnel sessions, report them labeled by tunnel_id.
if tm, ok := any.(interface{ SessionsByTunnel() map[string]int64 }); ok {
for tid, n := range tm.SessionsByTunnel() {
o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes(
attribute.String("site_id", getSiteID()),
attribute.String("tunnel_id", tid),
))
}
}
observeSiteOnlineFor(o, sv, siteID)
observeLastHeartbeatFor(o, sv, siteID)
observeSessionsFor(o, any)
}
}
}
@@ -68,3 +47,36 @@ func RegisterStateView(v StateView) {
})
}
}
func observeSiteOnlineFor(o metric.Observer, sv StateView, siteID string) {
if online, ok := sv.Online(siteID); ok {
val := int64(0)
if online { val = 1 }
o.ObserveInt64(mSiteOnline, val, metric.WithAttributes(
attribute.String("site_id", getSiteID()),
))
}
}
func observeLastHeartbeatFor(o metric.Observer, sv StateView, siteID string) {
if t, ok := sv.LastHeartbeat(siteID); ok {
secs := time.Since(t).Seconds()
o.ObserveFloat64(mSiteLastHeartbeat, secs, metric.WithAttributes(
attribute.String("site_id", getSiteID()),
))
}
}
func observeSessionsFor(o metric.Observer, any interface{}) {
if tm, ok := any.(interface{ SessionsByTunnel() map[string]int64 }); ok {
for tid, n := range tm.SessionsByTunnel() {
attrs := []attribute.KeyValue{
attribute.String("site_id", getSiteID()),
}
if ShouldIncludeTunnelID() && tid != "" {
attrs = append(attrs, attribute.String("tunnel_id", tid))
}
o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes(attrs...))
}
}
}

View File

@@ -118,143 +118,107 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) {
} else {
includeTunnelIDVal.Store(false)
}
// Build resource with required attributes and only include optional ones when non-empty
res := buildResource(ctx, cfg)
UpdateSiteInfo(cfg.SiteID, cfg.Region)
s := &Setup{}
readers, promHandler, shutdowns, err := setupMetricExport(ctx, cfg, res)
if err != nil { return nil, err }
s.PrometheusHandler = promHandler
// Build provider
mp := buildMeterProvider(res, readers)
otel.SetMeterProvider(mp)
s.MeterProvider = mp
s.shutdowns = append(s.shutdowns, mp.Shutdown)
// Optional tracing
if cfg.OTLPEnabled {
if tp, exp := setupTracing(ctx, cfg, res); tp != nil {
otel.SetTracerProvider(tp)
s.TracerProvider = tp
s.shutdowns = append(s.shutdowns, func(c context.Context) error {
return errors.Join(exp.Shutdown(c), tp.Shutdown(c))
})
}
}
// Add metric exporter shutdowns
s.shutdowns = append(s.shutdowns, shutdowns...)
// Runtime metrics
_ = runtime.Start(runtime.WithMeterProvider(mp))
// Instruments
if err := registerInstruments(); err != nil { return nil, err }
if cfg.BuildVersion != "" || cfg.BuildCommit != "" { RegisterBuildInfo(cfg.BuildVersion, cfg.BuildCommit) }
return s, nil
}
func buildResource(ctx context.Context, cfg Config) *resource.Resource {
attrs := []attribute.KeyValue{
semconv.ServiceName(cfg.ServiceName),
semconv.ServiceVersion(cfg.ServiceVersion),
}
if cfg.SiteID != "" {
attrs = append(attrs, attribute.String("site_id", cfg.SiteID))
}
if cfg.Region != "" {
attrs = append(attrs, attribute.String("region", cfg.Region))
}
res, _ := resource.New(ctx,
resource.WithFromEnv(),
resource.WithHost(),
resource.WithAttributes(attrs...),
)
if cfg.SiteID != "" { attrs = append(attrs, attribute.String("site_id", cfg.SiteID)) }
if cfg.Region != "" { attrs = append(attrs, attribute.String("region", cfg.Region)) }
res, _ := resource.New(ctx, resource.WithFromEnv(), resource.WithHost(), resource.WithAttributes(attrs...))
return res
}
// Seed global site/region for label propagation
UpdateSiteInfo(cfg.SiteID, cfg.Region)
s := &Setup{}
// Build metric readers/exporters
func setupMetricExport(ctx context.Context, cfg Config, res *resource.Resource) ([]sdkmetric.Reader, http.Handler, []func(context.Context) error, error) {
var readers []sdkmetric.Reader
// Prometheus exporter exposes a native /metrics handler for scraping
var shutdowns []func(context.Context) error
var promHandler http.Handler
if cfg.PromEnabled {
reg := promclient.NewRegistry()
exp, err := prometheus.New(prometheus.WithRegisterer(reg))
if err != nil {
return nil, err
}
if err != nil { return nil, nil, nil, err }
readers = append(readers, exp)
s.PrometheusHandler = promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
promHandler = promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
}
// Optional OTLP metric exporter (gRPC)
if cfg.OTLPEnabled {
mopts := []otlpmetricgrpc.Option{otlpmetricgrpc.WithEndpoint(cfg.OTLPEndpoint)}
// Headers support via OTEL_EXPORTER_OTLP_HEADERS (k=v,k2=v2)
if hdrs := parseOTLPHeaders(os.Getenv("OTEL_EXPORTER_OTLP_HEADERS")); len(hdrs) > 0 {
mopts = append(mopts, otlpmetricgrpc.WithHeaders(hdrs))
}
if cfg.OTLPInsecure {
mopts = append(mopts, otlpmetricgrpc.WithInsecure())
} else if certFile := os.Getenv("OTEL_EXPORTER_OTLP_CERTIFICATE"); certFile != "" {
creds, cerr := credentials.NewClientTLSFromFile(certFile, "")
if cerr == nil {
mopts = append(mopts, otlpmetricgrpc.WithTLSCredentials(creds))
}
if hdrs := parseOTLPHeaders(os.Getenv("OTEL_EXPORTER_OTLP_HEADERS")); len(hdrs) > 0 { mopts = append(mopts, otlpmetricgrpc.WithHeaders(hdrs)) }
if cfg.OTLPInsecure { mopts = append(mopts, otlpmetricgrpc.WithInsecure()) } else if certFile := os.Getenv("OTEL_EXPORTER_OTLP_CERTIFICATE"); certFile != "" {
if creds, cerr := credentials.NewClientTLSFromFile(certFile, ""); cerr == nil { mopts = append(mopts, otlpmetricgrpc.WithTLSCredentials(creds)) }
}
mexp, err := otlpmetricgrpc.New(ctx, mopts...)
if err != nil {
return nil, err
}
if err != nil { return nil, nil, nil, err }
readers = append(readers, sdkmetric.NewPeriodicReader(mexp, sdkmetric.WithInterval(cfg.MetricExportInterval)))
s.shutdowns = append(s.shutdowns, mexp.Shutdown)
shutdowns = append(shutdowns, mexp.Shutdown)
}
return readers, promHandler, shutdowns, nil
}
// Build provider options iteratively (WithReader is not variadic)
func buildMeterProvider(res *resource.Resource, readers []sdkmetric.Reader) *sdkmetric.MeterProvider {
var mpOpts []sdkmetric.Option
mpOpts = append(mpOpts, sdkmetric.WithResource(res))
for _, r := range readers {
mpOpts = append(mpOpts, sdkmetric.WithReader(r))
}
// Default view for latency histograms in seconds.
for _, r := range readers { mpOpts = append(mpOpts, sdkmetric.WithReader(r)) }
mpOpts = append(mpOpts, sdkmetric.WithView(sdkmetric.NewView(
sdkmetric.Instrument{
Name: "newt_*_latency_seconds",
},
sdkmetric.Stream{
Aggregation: sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30},
},
},
sdkmetric.Instrument{Name: "newt_*_latency_seconds"},
sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{Boundaries: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 30}}},
)))
// Attribute whitelist: only allow expected low-cardinality keys on newt_* instruments.
mpOpts = append(mpOpts, sdkmetric.WithView(sdkmetric.NewView(
sdkmetric.Instrument{Name: "newt_*"},
sdkmetric.Stream{
AttributeFilter: func(kv attribute.KeyValue) bool {
k := string(kv.Key)
switch k {
case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "initiator", "error_type", "msg_type", "phase", "version", "commit", "site_id", "region":
return true
default:
return false
}
},
},
)))
mp := sdkmetric.NewMeterProvider(mpOpts...)
otel.SetMeterProvider(mp)
s.MeterProvider = mp
s.shutdowns = append(s.shutdowns, mp.Shutdown)
// Optional tracing (OTLP over gRPC)
if cfg.OTLPEnabled {
topts := []otlptracegrpc.Option{otlptracegrpc.WithEndpoint(cfg.OTLPEndpoint)}
if hdrs := parseOTLPHeaders(os.Getenv("OTEL_EXPORTER_OTLP_HEADERS")); len(hdrs) > 0 {
topts = append(topts, otlptracegrpc.WithHeaders(hdrs))
}
if cfg.OTLPInsecure {
topts = append(topts, otlptracegrpc.WithInsecure())
} else if certFile := os.Getenv("OTEL_EXPORTER_OTLP_CERTIFICATE"); certFile != "" {
creds, cerr := credentials.NewClientTLSFromFile(certFile, "")
if cerr == nil {
topts = append(topts, otlptracegrpc.WithTLSCredentials(creds))
sdkmetric.Stream{AttributeFilter: func(kv attribute.KeyValue) bool {
k := string(kv.Key)
switch k {
case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "initiator", "error_type", "msg_type", "phase", "version", "commit", "site_id", "region":
return true
default:
return false
}
}
exp, err := otlptracegrpc.New(ctx, topts...)
if err == nil {
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exp),
sdktrace.WithResource(res),
)
otel.SetTracerProvider(tp)
s.TracerProvider = tp
s.shutdowns = append(s.shutdowns, func(ctx context.Context) error {
return errors.Join(exp.Shutdown(ctx), tp.Shutdown(ctx))
})
}
}
}},
)))
return sdkmetric.NewMeterProvider(mpOpts...)
}
// Export Go runtime metrics (goroutines, GC, mem, etc.)
_ = runtime.Start(runtime.WithMeterProvider(mp))
// Register instruments after provider is set
if err := registerInstruments(); err != nil {
return nil, err
func setupTracing(ctx context.Context, cfg Config, res *resource.Resource) (*sdktrace.TracerProvider, *otlptracegrpc.Exporter) {
topts := []otlptracegrpc.Option{otlptracegrpc.WithEndpoint(cfg.OTLPEndpoint)}
if hdrs := parseOTLPHeaders(os.Getenv("OTEL_EXPORTER_OTLP_HEADERS")); len(hdrs) > 0 { topts = append(topts, otlptracegrpc.WithHeaders(hdrs)) }
if cfg.OTLPInsecure { topts = append(topts, otlptracegrpc.WithInsecure()) } else if certFile := os.Getenv("OTEL_EXPORTER_OTLP_CERTIFICATE"); certFile != "" {
if creds, cerr := credentials.NewClientTLSFromFile(certFile, ""); cerr == nil { topts = append(topts, otlptracegrpc.WithTLSCredentials(creds)) }
}
// Optional build info metric
if cfg.BuildVersion != "" || cfg.BuildCommit != "" {
RegisterBuildInfo(cfg.BuildVersion, cfg.BuildCommit)
}
return s, nil
exp, err := otlptracegrpc.New(ctx, topts...)
if err != nil { return nil, nil }
tp := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exp), sdktrace.WithResource(res))
return tp, exp
}
// Shutdown flushes exporters and providers in reverse init order.