diff --git a/docker-compose-coolify.yml b/docker-compose-coolify.yml index 7073d12..e2bb1e6 100644 --- a/docker-compose-coolify.yml +++ b/docker-compose-coolify.yml @@ -1,5 +1,5 @@ services: - collector: + otel-collector: image: otel/opentelemetry-collector:0.111.0 command: ["--config=/etc/otelcol/config.yaml"] volumes: @@ -15,14 +15,14 @@ services: OTEL_SERVICE_NAME: newt NEWT_METRICS_PROMETHEUS_ENABLED: "true" NEWT_METRICS_OTLP_ENABLED: "true" - OTEL_EXPORTER_OTLP_ENDPOINT: "collector:4317" + OTEL_EXPORTER_OTLP_ENDPOINT: "otel-collector:4317" OTEL_EXPORTER_OTLP_INSECURE: "true" OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE: "cumulative" NEWT_ADMIN_ADDR: "0.0.0.0:2112" ports: - "2112:2112" depends_on: - - collector + - otel-collector prometheus: image: prom/prometheus:v2.55.0 diff --git a/docs/observability.md b/docs/observability.md index a652096..ae2d0d4 100644 --- a/docs/observability.md +++ b/docs/observability.md @@ -125,7 +125,7 @@ global: scrape_configs: - job_name: otel-collector static_configs: - - targets: ["collector:8889"] + - targets: ["otel-collector:8889"] ``` Reason mapping (source → reason) diff --git a/examples/prometheus.yml b/examples/prometheus.yml index 8b73c5c..9edb661 100644 --- a/examples/prometheus.yml +++ b/examples/prometheus.yml @@ -18,4 +18,4 @@ scrape_configs: # WARNING: Do not enable this together with the 'newt' job above or you will double-count. # - job_name: 'otel-collector' # static_configs: - # - targets: ['collector:8889'] + # - targets: ['otel-collector:8889'] diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index c75ebb9..1d11927 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -70,11 +70,26 @@ func registerInstruments() error { var err error initOnce.Do(func() { meter = otel.Meter("newt") - if e := registerSiteInstruments(); e != nil { err = e; return } - if e := registerTunnelInstruments(); e != nil { err = e; return } - if e := registerConnInstruments(); e != nil { err = e; return } - if e := registerConfigInstruments(); e != nil { err = e; return } - if e := registerBuildWSProxyInstruments(); e != nil { err = e; return } + if e := registerSiteInstruments(); e != nil { + err = e + return + } + if e := registerTunnelInstruments(); e != nil { + err = e + return + } + if e := registerConnInstruments(); e != nil { + err = e + return + } + if e := registerConfigInstruments(); e != nil { + err = e + return + } + if e := registerBuildWSProxyInstruments(); e != nil { + err = e + return + } }) return err } @@ -83,13 +98,19 @@ func registerSiteInstruments() error { var err error mSiteRegistrations, err = meter.Int64Counter("newt_site_registrations_total", metric.WithDescription("Total site registration attempts")) - if err != nil { return err } + if err != nil { + return err + } mSiteOnline, err = meter.Int64ObservableGauge("newt_site_online", metric.WithDescription("Site online (0/1)")) - if err != nil { return err } + if err != nil { + return err + } mSiteLastHeartbeat, err = meter.Float64ObservableGauge("newt_site_last_heartbeat_seconds", metric.WithDescription("Seconds since last site heartbeat")) - if err != nil { return err } + if err != nil { + return err + } return nil } @@ -97,18 +118,26 @@ func registerTunnelInstruments() error { var err error mTunnelSessions, err = meter.Int64ObservableGauge("newt_tunnel_sessions", metric.WithDescription("Active tunnel sessions")) - if err != nil { return err } + if err != nil { + return err + } mTunnelBytes, err = meter.Int64Counter("newt_tunnel_bytes_total", metric.WithDescription("Tunnel bytes ingress/egress"), metric.WithUnit("By")) - if err != nil { return err } + if err != nil { + return err + } mTunnelLatency, err = meter.Float64Histogram("newt_tunnel_latency_seconds", metric.WithDescription("Per-tunnel latency in seconds"), metric.WithUnit("s")) - if err != nil { return err } + if err != nil { + return err + } mReconnects, err = meter.Int64Counter("newt_tunnel_reconnects_total", metric.WithDescription("Tunnel reconnect events")) - if err != nil { return err } + if err != nil { + return err + } return nil } @@ -116,10 +145,14 @@ func registerConnInstruments() error { var err error mConnAttempts, err = meter.Int64Counter("newt_connection_attempts_total", metric.WithDescription("Connection attempts")) - if err != nil { return err } + if err != nil { + return err + } mConnErrors, err = meter.Int64Counter("newt_connection_errors_total", metric.WithDescription("Connection errors by type")) - if err != nil { return err } + if err != nil { + return err + } return nil } @@ -310,10 +343,13 @@ func ObserveProxyAsyncBacklogObs(o metric.Observer, value int64, attrs []attribu } func IncProxyDrops(ctx context.Context, tunnelID, protocol string) { - mProxyDropsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite( - attribute.String("tunnel_id", tunnelID), + attrs := []attribute.KeyValue{ attribute.String("protocol", protocol), - )...)) + } + if ShouldIncludeTunnelID() && tunnelID != "" { + attrs = append(attrs, attribute.String("tunnel_id", tunnelID)) + } + mProxyDropsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite(attrs...)...)) } // --- Config/PKI helpers --- diff --git a/proxy/manager.go b/proxy/manager.go index ac80d8f..ceaa12b 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -275,7 +275,7 @@ func (pm *ProxyManager) Start() error { telemetry.ObserveProxyActiveConnsObs(o, e.activeTCP.Load(), e.attrOutTCP.ToSlice()) telemetry.ObserveProxyActiveConnsObs(o, e.activeUDP.Load(), e.attrOutUDP.ToSlice()) // backlog bytes (sum of unflushed counters) - b := int64(e.bytesInTCP.Load()+e.bytesOutTCP.Load()+e.bytesInUDP.Load()+e.bytesOutUDP.Load()) + b := int64(e.bytesInTCP.Load() + e.bytesOutTCP.Load() + e.bytesInUDP.Load() + e.bytesOutUDP.Load()) telemetry.ObserveProxyAsyncBacklogObs(o, b, e.attrOutTCP.ToSlice()) telemetry.ObserveProxyBufferBytesObs(o, b, e.attrOutTCP.ToSlice()) } @@ -598,14 +598,15 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { continue } - targetConn, err = net.DialUDP("udp", nil, targetUDPAddr) - if e := pm.getEntry(pm.currentTunnelID); e != nil { - e.activeUDP.Add(1) - } + targetConn, err = net.DialUDP("udp", nil, targetUDPAddr) if err != nil { logger.Error("Error connecting to target: %v", err) continue } + // Only increment activeUDP after a successful DialUDP + if e := pm.getEntry(pm.currentTunnelID); e != nil { + e.activeUDP.Add(1) + } clientsMutex.Lock() clientConns[clientKey] = targetConn @@ -656,15 +657,15 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { }(clientKey, targetConn, remoteAddr) } - written, err := targetConn.Write(buffer[:n]) - if err != nil { - logger.Error("Error writing to target: %v", err) - telemetry.IncProxyDrops(context.Background(), pm.currentTunnelID, "udp") - targetConn.Close() - clientsMutex.Lock() - delete(clientConns, clientKey) - clientsMutex.Unlock() - } else if pm.currentTunnelID != "" && written > 0 { + written, err := targetConn.Write(buffer[:n]) + if err != nil { + logger.Error("Error writing to target: %v", err) + telemetry.IncProxyDrops(context.Background(), pm.currentTunnelID, "udp") + targetConn.Close() + clientsMutex.Lock() + delete(clientConns, clientKey) + clientsMutex.Unlock() + } else if pm.currentTunnelID != "" && written > 0 { if pm.asyncBytes { if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesInUDP.Add(uint64(written))