fix(docker-compose, prometheus, telemetry, proxy): standardize collector naming and improve error handling

This commit is contained in:
Marc Schäfer
2025-10-10 14:42:05 +02:00
parent 8d0e6be2c7
commit bd62da4cc9
5 changed files with 73 additions and 36 deletions

View File

@@ -1,5 +1,5 @@
services: services:
collector: otel-collector:
image: otel/opentelemetry-collector:0.111.0 image: otel/opentelemetry-collector:0.111.0
command: ["--config=/etc/otelcol/config.yaml"] command: ["--config=/etc/otelcol/config.yaml"]
volumes: volumes:
@@ -15,14 +15,14 @@ services:
OTEL_SERVICE_NAME: newt OTEL_SERVICE_NAME: newt
NEWT_METRICS_PROMETHEUS_ENABLED: "true" NEWT_METRICS_PROMETHEUS_ENABLED: "true"
NEWT_METRICS_OTLP_ENABLED: "true" NEWT_METRICS_OTLP_ENABLED: "true"
OTEL_EXPORTER_OTLP_ENDPOINT: "collector:4317" OTEL_EXPORTER_OTLP_ENDPOINT: "otel-collector:4317"
OTEL_EXPORTER_OTLP_INSECURE: "true" OTEL_EXPORTER_OTLP_INSECURE: "true"
OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "cumulative" OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "cumulative"
NEWT_ADMIN_ADDR: "0.0.0.0:2112" NEWT_ADMIN_ADDR: "0.0.0.0:2112"
ports: ports:
- "2112:2112" - "2112:2112"
depends_on: depends_on:
- collector - otel-collector
prometheus: prometheus:
image: prom/prometheus:v2.55.0 image: prom/prometheus:v2.55.0

View File

@@ -125,7 +125,7 @@ global:
scrape_configs: scrape_configs:
- job_name: otel-collector - job_name: otel-collector
static_configs: static_configs:
- targets: ["collector:8889"] - targets: ["otel-collector:8889"]
``` ```
Reason mapping (source → reason) Reason mapping (source → reason)

View File

@@ -18,4 +18,4 @@ scrape_configs:
# WARNING: Do not enable this together with the 'newt' job above or you will double-count. # WARNING: Do not enable this together with the 'newt' job above or you will double-count.
# - job_name: 'otel-collector' # - job_name: 'otel-collector'
# static_configs: # static_configs:
# - targets: ['collector:8889'] # - targets: ['otel-collector:8889']

View File

@@ -70,11 +70,26 @@ func registerInstruments() error {
var err error var err error
initOnce.Do(func() { initOnce.Do(func() {
meter = otel.Meter("newt") meter = otel.Meter("newt")
if e := registerSiteInstruments(); e != nil { err = e; return } if e := registerSiteInstruments(); e != nil {
if e := registerTunnelInstruments(); e != nil { err = e; return } err = e
if e := registerConnInstruments(); e != nil { err = e; return } return
if e := registerConfigInstruments(); e != nil { err = e; return } }
if e := registerBuildWSProxyInstruments(); e != nil { err = e; return } if e := registerTunnelInstruments(); e != nil {
err = e
return
}
if e := registerConnInstruments(); e != nil {
err = e
return
}
if e := registerConfigInstruments(); e != nil {
err = e
return
}
if e := registerBuildWSProxyInstruments(); e != nil {
err = e
return
}
}) })
return err return err
} }
@@ -83,13 +98,19 @@ func registerSiteInstruments() error {
var err error var err error
mSiteRegistrations, err = meter.Int64Counter("newt_site_registrations_total", mSiteRegistrations, err = meter.Int64Counter("newt_site_registrations_total",
metric.WithDescription("Total site registration attempts")) metric.WithDescription("Total site registration attempts"))
if err != nil { return err } if err != nil {
return err
}
mSiteOnline, err = meter.Int64ObservableGauge("newt_site_online", mSiteOnline, err = meter.Int64ObservableGauge("newt_site_online",
metric.WithDescription("Site online (0/1)")) metric.WithDescription("Site online (0/1)"))
if err != nil { return err } if err != nil {
return err
}
mSiteLastHeartbeat, err = meter.Float64ObservableGauge("newt_site_last_heartbeat_seconds", mSiteLastHeartbeat, err = meter.Float64ObservableGauge("newt_site_last_heartbeat_seconds",
metric.WithDescription("Seconds since last site heartbeat")) metric.WithDescription("Seconds since last site heartbeat"))
if err != nil { return err } if err != nil {
return err
}
return nil return nil
} }
@@ -97,18 +118,26 @@ func registerTunnelInstruments() error {
var err error var err error
mTunnelSessions, err = meter.Int64ObservableGauge("newt_tunnel_sessions", mTunnelSessions, err = meter.Int64ObservableGauge("newt_tunnel_sessions",
metric.WithDescription("Active tunnel sessions")) metric.WithDescription("Active tunnel sessions"))
if err != nil { return err } if err != nil {
return err
}
mTunnelBytes, err = meter.Int64Counter("newt_tunnel_bytes_total", mTunnelBytes, err = meter.Int64Counter("newt_tunnel_bytes_total",
metric.WithDescription("Tunnel bytes ingress/egress"), metric.WithDescription("Tunnel bytes ingress/egress"),
metric.WithUnit("By")) metric.WithUnit("By"))
if err != nil { return err } if err != nil {
return err
}
mTunnelLatency, err = meter.Float64Histogram("newt_tunnel_latency_seconds", mTunnelLatency, err = meter.Float64Histogram("newt_tunnel_latency_seconds",
metric.WithDescription("Per-tunnel latency in seconds"), metric.WithDescription("Per-tunnel latency in seconds"),
metric.WithUnit("s")) metric.WithUnit("s"))
if err != nil { return err } if err != nil {
return err
}
mReconnects, err = meter.Int64Counter("newt_tunnel_reconnects_total", mReconnects, err = meter.Int64Counter("newt_tunnel_reconnects_total",
metric.WithDescription("Tunnel reconnect events")) metric.WithDescription("Tunnel reconnect events"))
if err != nil { return err } if err != nil {
return err
}
return nil return nil
} }
@@ -116,10 +145,14 @@ func registerConnInstruments() error {
var err error var err error
mConnAttempts, err = meter.Int64Counter("newt_connection_attempts_total", mConnAttempts, err = meter.Int64Counter("newt_connection_attempts_total",
metric.WithDescription("Connection attempts")) metric.WithDescription("Connection attempts"))
if err != nil { return err } if err != nil {
return err
}
mConnErrors, err = meter.Int64Counter("newt_connection_errors_total", mConnErrors, err = meter.Int64Counter("newt_connection_errors_total",
metric.WithDescription("Connection errors by type")) metric.WithDescription("Connection errors by type"))
if err != nil { return err } if err != nil {
return err
}
return nil return nil
} }
@@ -310,10 +343,13 @@ func ObserveProxyAsyncBacklogObs(o metric.Observer, value int64, attrs []attribu
} }
func IncProxyDrops(ctx context.Context, tunnelID, protocol string) { func IncProxyDrops(ctx context.Context, tunnelID, protocol string) {
mProxyDropsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite( attrs := []attribute.KeyValue{
attribute.String("tunnel_id", tunnelID),
attribute.String("protocol", protocol), attribute.String("protocol", protocol),
)...)) }
if ShouldIncludeTunnelID() && tunnelID != "" {
attrs = append(attrs, attribute.String("tunnel_id", tunnelID))
}
mProxyDropsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite(attrs...)...))
} }
// --- Config/PKI helpers --- // --- Config/PKI helpers ---

View File

@@ -275,7 +275,7 @@ func (pm *ProxyManager) Start() error {
telemetry.ObserveProxyActiveConnsObs(o, e.activeTCP.Load(), e.attrOutTCP.ToSlice()) telemetry.ObserveProxyActiveConnsObs(o, e.activeTCP.Load(), e.attrOutTCP.ToSlice())
telemetry.ObserveProxyActiveConnsObs(o, e.activeUDP.Load(), e.attrOutUDP.ToSlice()) telemetry.ObserveProxyActiveConnsObs(o, e.activeUDP.Load(), e.attrOutUDP.ToSlice())
// backlog bytes (sum of unflushed counters) // backlog bytes (sum of unflushed counters)
b := int64(e.bytesInTCP.Load()+e.bytesOutTCP.Load()+e.bytesInUDP.Load()+e.bytesOutUDP.Load()) b := int64(e.bytesInTCP.Load() + e.bytesOutTCP.Load() + e.bytesInUDP.Load() + e.bytesOutUDP.Load())
telemetry.ObserveProxyAsyncBacklogObs(o, b, e.attrOutTCP.ToSlice()) telemetry.ObserveProxyAsyncBacklogObs(o, b, e.attrOutTCP.ToSlice())
telemetry.ObserveProxyBufferBytesObs(o, b, e.attrOutTCP.ToSlice()) telemetry.ObserveProxyBufferBytesObs(o, b, e.attrOutTCP.ToSlice())
} }
@@ -598,14 +598,15 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
continue continue
} }
targetConn, err = net.DialUDP("udp", nil, targetUDPAddr) targetConn, err = net.DialUDP("udp", nil, targetUDPAddr)
if e := pm.getEntry(pm.currentTunnelID); e != nil {
e.activeUDP.Add(1)
}
if err != nil { if err != nil {
logger.Error("Error connecting to target: %v", err) logger.Error("Error connecting to target: %v", err)
continue continue
} }
// Only increment activeUDP after a successful DialUDP
if e := pm.getEntry(pm.currentTunnelID); e != nil {
e.activeUDP.Add(1)
}
clientsMutex.Lock() clientsMutex.Lock()
clientConns[clientKey] = targetConn clientConns[clientKey] = targetConn
@@ -656,15 +657,15 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
}(clientKey, targetConn, remoteAddr) }(clientKey, targetConn, remoteAddr)
} }
written, err := targetConn.Write(buffer[:n]) written, err := targetConn.Write(buffer[:n])
if err != nil { if err != nil {
logger.Error("Error writing to target: %v", err) logger.Error("Error writing to target: %v", err)
telemetry.IncProxyDrops(context.Background(), pm.currentTunnelID, "udp") telemetry.IncProxyDrops(context.Background(), pm.currentTunnelID, "udp")
targetConn.Close() targetConn.Close()
clientsMutex.Lock() clientsMutex.Lock()
delete(clientConns, clientKey) delete(clientConns, clientKey)
clientsMutex.Unlock() clientsMutex.Unlock()
} else if pm.currentTunnelID != "" && written > 0 { } else if pm.currentTunnelID != "" && written > 0 {
if pm.asyncBytes { if pm.asyncBytes {
if e := pm.getEntry(pm.currentTunnelID); e != nil { if e := pm.getEntry(pm.currentTunnelID); e != nil {
e.bytesInUDP.Add(uint64(written)) e.bytesInUDP.Add(uint64(written))