fix(metrics): direction=ingress|egress for bytes; remove transport on tunnel_sessions; extend allow-list (msg_type, phase); add units for histograms and bytes; handle callback errors; normalize error_type taxonomy; HTTP error mapping to enums

This commit is contained in:
Marc Schäfer
2025-10-08 00:43:53 +02:00
parent 75d5e695d6
commit 9ace45e71f
5 changed files with 42 additions and 32 deletions

View File

@@ -95,12 +95,14 @@ func registerInstruments() error {
return
}
mTunnelBytes, err = meter.Int64Counter("newt_tunnel_bytes_total",
metric.WithDescription("Tunnel bytes in/out"))
metric.WithDescription("Tunnel bytes ingress/egress"),
metric.WithUnit("By"))
if err != nil {
return
}
mTunnelLatency, err = meter.Float64Histogram("newt_tunnel_latency_seconds",
metric.WithDescription("Per-tunnel latency in seconds"))
metric.WithDescription("Per-tunnel latency in seconds"),
metric.WithUnit("s"))
if err != nil {
return
}
@@ -128,7 +130,8 @@ func registerInstruments() error {
mRestartCount, _ = meter.Int64Counter("newt_restart_count_total",
metric.WithDescription("Process restart count (incremented on start)"))
mConfigApply, _ = meter.Float64Histogram("newt_config_apply_seconds",
metric.WithDescription("Configuration apply duration in seconds"))
metric.WithDescription("Configuration apply duration in seconds"),
metric.WithUnit("s"))
mCertRotationTotal, _ = meter.Int64Counter("newt_cert_rotation_total",
metric.WithDescription("Certificate rotation events (success/failure)"))
@@ -138,7 +141,8 @@ func registerInstruments() error {
// WebSocket
mWSConnectLatency, _ = meter.Float64Histogram("newt_websocket_connect_latency_seconds",
metric.WithDescription("WebSocket connect latency in seconds"))
metric.WithDescription("WebSocket connect latency in seconds"),
metric.WithUnit("s"))
mWSMessages, _ = meter.Int64Counter("newt_websocket_messages_total",
metric.WithDescription("WebSocket messages by direction and type"))
@@ -146,14 +150,16 @@ func registerInstruments() error {
mProxyActiveConns, _ = meter.Int64ObservableGauge("newt_proxy_active_connections",
metric.WithDescription("Proxy active connections per tunnel and protocol"))
mProxyBufferBytes, _ = meter.Int64ObservableGauge("newt_proxy_buffer_bytes",
metric.WithDescription("Proxy buffer bytes (may approximate async backlog)"))
metric.WithDescription("Proxy buffer bytes (may approximate async backlog)"),
metric.WithUnit("By"))
mProxyAsyncBacklogByte, _ = meter.Int64ObservableGauge("newt_proxy_async_backlog_bytes",
metric.WithDescription("Unflushed async byte backlog per tunnel and protocol"))
metric.WithDescription("Unflushed async byte backlog per tunnel and protocol"),
metric.WithUnit("By"))
mProxyDropsTotal, _ = meter.Int64Counter("newt_proxy_drops_total",
metric.WithDescription("Proxy drops due to write errors"))
// Register a default callback for build info if version/commit set
meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
if e := meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
if buildVersion == "" && buildCommit == "" {
return nil
}
@@ -167,7 +173,10 @@ func registerInstruments() error {
attrs = append(attrs, siteAttrs()...)
o.ObserveInt64(mBuildInfo, 1, metric.WithAttributes(attrs...))
return nil
}, mBuildInfo)
}, mBuildInfo); e != nil {
// forward to global OTel error handler; Init will continue but build_info will be missing
otel.Handle(e)
}
})
return err
}
@@ -197,7 +206,9 @@ var (
// })
func SetObservableCallback(cb func(context.Context, metric.Observer) error) {
obsOnce.Do(func() {
meter.RegisterCallback(cb, mSiteOnline, mSiteLastHeartbeat, mTunnelSessions)
if e := meter.RegisterCallback(cb, mSiteOnline, mSiteLastHeartbeat, mTunnelSessions); e != nil {
otel.Handle(e)
}
obsStopper = func() { /* no-op; otel callbacks are unregistered when provider shuts down */ }
})
}
@@ -205,7 +216,9 @@ func SetObservableCallback(cb func(context.Context, metric.Observer) error) {
// SetProxyObservableCallback registers a callback to observe proxy gauges.
func SetProxyObservableCallback(cb func(context.Context, metric.Observer) error) {
proxyObsOnce.Do(func() {
meter.RegisterCallback(cb, mProxyActiveConns, mProxyBufferBytes, mProxyAsyncBacklogByte)
if e := meter.RegisterCallback(cb, mProxyActiveConns, mProxyBufferBytes, mProxyAsyncBacklogByte); e != nil {
otel.Handle(e)
}
proxyStopper = func() {}
})
}

View File

@@ -58,7 +58,6 @@ func RegisterStateView(v StateView) {
o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes(
attribute.String("site_id", getSiteID()),
attribute.String("tunnel_id", tid),
attribute.String("transport", "wireguard"),
))
}
}

View File

@@ -195,7 +195,7 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) {
AttributeFilter: func(kv attribute.KeyValue) bool {
k := string(kv.Key)
switch k {
case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "initiator", "error_type", "version", "commit", "site_id", "region":
case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "initiator", "error_type", "msg_type", "phase", "version", "commit", "site_id", "region":
return true
default:
return false
@@ -296,6 +296,8 @@ var siteIDVal atomic.Value
var regionVal atomic.Value
// UpdateSiteInfo updates the global site_id and region used for metric labels.
// Thread-safe via atomic.Value: subsequent metric emissions will include
// the new labels, prior emissions remain unchanged.
func UpdateSiteInfo(siteID, region string) {
if siteID != "" {
siteIDVal.Store(siteID)

View File

@@ -119,22 +119,22 @@ func (pm *ProxyManager) SetTunnelID(id string) {
site := telemetry.SiteLabelKVs()
e.attrInTCP = attribute.NewSet(append(site,
attribute.String("tunnel_id", id),
attribute.String("direction", "in"),
attribute.String("direction", "ingress"),
attribute.String("protocol", "tcp"),
)...)
e.attrOutTCP = attribute.NewSet(append(site,
attribute.String("tunnel_id", id),
attribute.String("direction", "out"),
attribute.String("direction", "egress"),
attribute.String("protocol", "tcp"),
)...)
e.attrInUDP = attribute.NewSet(append(site,
attribute.String("tunnel_id", id),
attribute.String("direction", "in"),
attribute.String("direction", "ingress"),
attribute.String("protocol", "udp"),
)...)
e.attrOutUDP = attribute.NewSet(append(site,
attribute.String("tunnel_id", id),
attribute.String("direction", "out"),
attribute.String("direction", "egress"),
attribute.String("protocol", "udp"),
)...)
}

View File

@@ -304,13 +304,11 @@ func (c *Client) getToken() (string, error) {
body, _ := io.ReadAll(resp.Body)
logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body))
telemetry.IncConnAttempt(context.Background(), "auth", "failure")
bin := "http_other"
if resp.StatusCode >= 500 {
bin = "http_5xx"
} else if resp.StatusCode >= 400 {
bin = "http_4xx"
etype := "io_error"
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
etype = "auth_failed"
}
telemetry.IncConnError(context.Background(), "auth", bin)
telemetry.IncConnError(context.Background(), "auth", etype)
// Reconnect reason mapping for auth failures
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonAuthError)
@@ -338,7 +336,8 @@ func (c *Client) getToken() (string, error) {
return tokenResp.Data.Token, nil
}
// classifyConnError maps common errors to low-cardinality error_type labels
// classifyConnError maps to fixed, low-cardinality error_type values.
// Allowed enum: dial_timeout, tls_handshake, auth_failed, io_error
func classifyConnError(err error) string {
if err == nil {
return ""
@@ -346,17 +345,14 @@ func classifyConnError(err error) string {
msg := strings.ToLower(err.Error())
switch {
case strings.Contains(msg, "tls") || strings.Contains(msg, "certificate"):
return "tls"
case strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout"):
return "timeout"
case strings.Contains(msg, "no such host") || strings.Contains(msg, "dns"):
return "dns"
return "tls_handshake"
case strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout") || strings.Contains(msg, "deadline exceeded"):
return "dial_timeout"
case strings.Contains(msg, "unauthorized") || strings.Contains(msg, "forbidden"):
return "auth"
case strings.Contains(msg, "broken pipe") || strings.Contains(msg, "connection reset") || strings.Contains(msg, "connection refused") || strings.Contains(msg, "use of closed network connection") || strings.Contains(msg, "network is unreachable"):
return "io"
return "auth_failed"
default:
return "other"
// Group remaining network/socket errors as io_error to avoid label explosion
return "io_error"
}
}