From b6f5458ad995a2e8efe332adc614ccce1c578745 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc=20Sch=C3=A4fer?= Date: Fri, 10 Oct 2025 15:30:06 +0200 Subject: [PATCH] fix(telemetry): enhance session observation logic for tunnel IDs and site-level aggregation --- internal/telemetry/state_view.go | 36 +++++++++++++++++++++++++------- util.go | 14 +++++++------ 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/internal/telemetry/state_view.go b/internal/telemetry/state_view.go index 1a51452..071a405 100644 --- a/internal/telemetry/state_view.go +++ b/internal/telemetry/state_view.go @@ -71,14 +71,36 @@ func observeLastHeartbeatFor(o metric.Observer, sv StateView, siteID string) { func observeSessionsFor(o metric.Observer, siteID string, any interface{}) { if tm, ok := any.(interface{ SessionsByTunnel() map[string]int64 }); ok { - for tid, n := range tm.SessionsByTunnel() { - attrs := []attribute.KeyValue{ - attribute.String("site_id", siteID), + sessions := tm.SessionsByTunnel() + // If tunnel_id labels are enabled, preserve existing per-tunnel observations + if ShouldIncludeTunnelID() { + for tid, n := range sessions { + attrs := []attribute.KeyValue{ + attribute.String("site_id", siteID), + } + if tid != "" { + attrs = append(attrs, attribute.String("tunnel_id", tid)) + } + o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes(attrs...)) } - if ShouldIncludeTunnelID() && tid != "" { - attrs = append(attrs, attribute.String("tunnel_id", tid)) - } - o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes(attrs...)) + return } + // When tunnel_id is disabled, collapse per-tunnel counts into a single site-level value + var total int64 + for _, n := range sessions { + total += n + } + // If there are no per-tunnel entries, fall back to ActiveSessions() if available + if total == 0 { + if svAny := stateView.Load(); svAny != nil { + if sv, ok := svAny.(StateView); ok { + if n, ok2 := sv.ActiveSessions(siteID); ok2 { + total = n + } + } + } + } + o.ObserveInt64(mTunnelSessions, total, metric.WithAttributes(attribute.String("site_id", siteID))) + return } } diff --git a/util.go b/util.go index fa339e8..9f0d268 100644 --- a/util.go +++ b/util.go @@ -225,6 +225,8 @@ func pingWithRetry(tnet *netstack.Net, dst string, timeout time.Duration) (stopC } } case <-pingStopChan: + // Stop the goroutine when signaled + return } } }() @@ -293,12 +295,12 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien // More lenient threshold for declaring connection lost under load failureThreshold := 4 if consecutiveFailures >= failureThreshold && currentInterval < maxInterval { - if !connectionLost { - connectionLost = true - logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures) - if tunnelID != "" { - telemetry.IncReconnect(context.Background(), tunnelID, "client", telemetry.ReasonTimeout) - } + if !connectionLost { + connectionLost = true + logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures) + if tunnelID != "" { + telemetry.IncReconnect(context.Background(), tunnelID, "client", telemetry.ReasonTimeout) + } stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second) // Send registration message to the server for backward compatibility err := client.SendMessage("newt/wg/register", map[string]interface{}{