fix(telemetry): enhance session observation logic for tunnel IDs and site-level aggregation

This commit is contained in:
Marc Schäfer
2025-10-10 15:30:06 +02:00
parent 4ef9737862
commit b6f5458ad9
2 changed files with 37 additions and 13 deletions

View File

@@ -71,14 +71,36 @@ func observeLastHeartbeatFor(o metric.Observer, sv StateView, siteID string) {
func observeSessionsFor(o metric.Observer, siteID string, any interface{}) { func observeSessionsFor(o metric.Observer, siteID string, any interface{}) {
if tm, ok := any.(interface{ SessionsByTunnel() map[string]int64 }); ok { if tm, ok := any.(interface{ SessionsByTunnel() map[string]int64 }); ok {
for tid, n := range tm.SessionsByTunnel() { sessions := tm.SessionsByTunnel()
attrs := []attribute.KeyValue{ // If tunnel_id labels are enabled, preserve existing per-tunnel observations
attribute.String("site_id", siteID), if ShouldIncludeTunnelID() {
for tid, n := range sessions {
attrs := []attribute.KeyValue{
attribute.String("site_id", siteID),
}
if tid != "" {
attrs = append(attrs, attribute.String("tunnel_id", tid))
}
o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes(attrs...))
} }
if ShouldIncludeTunnelID() && tid != "" { return
attrs = append(attrs, attribute.String("tunnel_id", tid))
}
o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes(attrs...))
} }
// When tunnel_id is disabled, collapse per-tunnel counts into a single site-level value
var total int64
for _, n := range sessions {
total += n
}
// If there are no per-tunnel entries, fall back to ActiveSessions() if available
if total == 0 {
if svAny := stateView.Load(); svAny != nil {
if sv, ok := svAny.(StateView); ok {
if n, ok2 := sv.ActiveSessions(siteID); ok2 {
total = n
}
}
}
}
o.ObserveInt64(mTunnelSessions, total, metric.WithAttributes(attribute.String("site_id", siteID)))
return
} }
} }

14
util.go
View File

@@ -225,6 +225,8 @@ func pingWithRetry(tnet *netstack.Net, dst string, timeout time.Duration) (stopC
} }
} }
case <-pingStopChan: case <-pingStopChan:
// Stop the goroutine when signaled
return
} }
} }
}() }()
@@ -293,12 +295,12 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien
// More lenient threshold for declaring connection lost under load // More lenient threshold for declaring connection lost under load
failureThreshold := 4 failureThreshold := 4
if consecutiveFailures >= failureThreshold && currentInterval < maxInterval { if consecutiveFailures >= failureThreshold && currentInterval < maxInterval {
if !connectionLost { if !connectionLost {
connectionLost = true connectionLost = true
logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures) logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures)
if tunnelID != "" { if tunnelID != "" {
telemetry.IncReconnect(context.Background(), tunnelID, "client", telemetry.ReasonTimeout) telemetry.IncReconnect(context.Background(), tunnelID, "client", telemetry.ReasonTimeout)
} }
stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second) stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second)
// Send registration message to the server for backward compatibility // Send registration message to the server for backward compatibility
err := client.SendMessage("newt/wg/register", map[string]interface{}{ err := client.SendMessage("newt/wg/register", map[string]interface{}{