Files
newt/patches/02_reconnect_rtt.patch
Marc Schäfer b53fb70778 feat: Implement telemetry for reconnect reasons and RTT reporting
- Added telemetry hooks to track reconnect reasons for WireGuard connections, including server requests and authentication errors.
- Introduced RTT reporting to telemetry for better latency monitoring.
- Enhanced metrics configuration with flags for Prometheus and OTLP exporters.
- Implemented graceful shutdown and signal handling in the main application.
- Updated WebSocket client to classify connection errors and report them to telemetry.
- Added support for async byte counting in metrics.
- Improved handling of reconnect scenarios in the WireGuard service.
- Added documentation for applying patches and rollback procedures.
2025-10-07 09:17:05 +02:00

467 lines
17 KiB
Diff

diff --git a/main.go b/main.go
index 12849b1..c223b75 100644
--- a/main.go
+++ b/main.go
@@ -1,7 +1,9 @@
package main
import (
+ "context"
"encoding/json"
+ "errors"
"flag"
"fmt"
"net"
@@ -22,6 +24,9 @@ import (
"github.com/fosrl/newt/updates"
"github.com/fosrl/newt/websocket"
+ "github.com/fosrl/newt/internal/state"
+ "github.com/fosrl/newt/internal/telemetry"
+ "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"golang.zx2c4.com/wireguard/conn"
"golang.zx2c4.com/wireguard/device"
"golang.zx2c4.com/wireguard/tun"
@@ -116,6 +121,13 @@ var (
healthMonitor *healthcheck.Monitor
enforceHealthcheckCert bool
+ // Observability/metrics flags
+ metricsEnabled bool
+ otlpEnabled bool
+ adminAddr string
+ region string
+ metricsAsyncBytes bool
+
// New mTLS configuration variables
tlsClientCert string
tlsClientKey string
@@ -126,6 +138,10 @@ var (
)
func main() {
+ // Prepare context for graceful shutdown and signal handling
+ ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
+ defer stop()
+
// if PANGOLIN_ENDPOINT, NEWT_ID, and NEWT_SECRET are set as environment variables, they will be used as default values
endpoint = os.Getenv("PANGOLIN_ENDPOINT")
id = os.Getenv("NEWT_ID")
@@ -141,6 +157,13 @@ func main() {
useNativeInterfaceEnv := os.Getenv("USE_NATIVE_INTERFACE")
enforceHealthcheckCertEnv := os.Getenv("ENFORCE_HC_CERT")
+ // Metrics/observability env mirrors
+ metricsEnabledEnv := os.Getenv("NEWT_METRICS_PROMETHEUS_ENABLED")
+ otlpEnabledEnv := os.Getenv("NEWT_METRICS_OTLP_ENABLED")
+ adminAddrEnv := os.Getenv("NEWT_ADMIN_ADDR")
+ regionEnv := os.Getenv("NEWT_REGION")
+ asyncBytesEnv := os.Getenv("NEWT_METRICS_ASYNC_BYTES")
+
keepInterface = keepInterfaceEnv == "true"
acceptClients = acceptClientsEnv == "true"
useNativeInterface = useNativeInterfaceEnv == "true"
@@ -272,6 +295,35 @@ func main() {
flag.StringVar(&healthFile, "health-file", "", "Path to health file (if unset, health file won't be written)")
}
+ // Metrics/observability flags (mirror ENV if unset)
+ if metricsEnabledEnv == "" {
+ flag.BoolVar(&metricsEnabled, "metrics", true, "Enable Prometheus /metrics exporter")
+ } else {
+ if v, err := strconv.ParseBool(metricsEnabledEnv); err == nil { metricsEnabled = v } else { metricsEnabled = true }
+ }
+ if otlpEnabledEnv == "" {
+ flag.BoolVar(&otlpEnabled, "otlp", false, "Enable OTLP exporters (metrics/traces) to OTEL_EXPORTER_OTLP_ENDPOINT")
+ } else {
+ if v, err := strconv.ParseBool(otlpEnabledEnv); err == nil { otlpEnabled = v }
+ }
+ if adminAddrEnv == "" {
+ flag.StringVar(&adminAddr, "metrics-admin-addr", "127.0.0.1:2112", "Admin/metrics bind address")
+ } else {
+ adminAddr = adminAddrEnv
+ }
+ // Async bytes toggle
+ if asyncBytesEnv == "" {
+ flag.BoolVar(&metricsAsyncBytes, "metrics-async-bytes", false, "Enable async bytes counting (background flush; lower hot path overhead)")
+ } else {
+ if v, err := strconv.ParseBool(asyncBytesEnv); err == nil { metricsAsyncBytes = v }
+ }
+ // Optional region flag (resource attribute)
+ if regionEnv == "" {
+ flag.StringVar(&region, "region", "", "Optional region resource attribute (also NEWT_REGION)")
+ } else {
+ region = regionEnv
+ }
+
// do a --version check
version := flag.Bool("version", false, "Print the version")
@@ -286,6 +338,50 @@ func main() {
loggerLevel := parseLogLevel(logLevel)
logger.GetLogger().SetLevel(parseLogLevel(logLevel))
+ // Initialize telemetry after flags are parsed (so flags override env)
+ tcfg := telemetry.FromEnv()
+ tcfg.PromEnabled = metricsEnabled
+ tcfg.OTLPEnabled = otlpEnabled
+ if adminAddr != "" { tcfg.AdminAddr = adminAddr }
+ // Resource attributes (if available)
+ tcfg.SiteID = id
+ tcfg.Region = region
+ // Build info
+ tcfg.BuildVersion = newtVersion
+ tcfg.BuildCommit = os.Getenv("NEWT_COMMIT")
+
+ tel, telErr := telemetry.Init(ctx, tcfg)
+ if telErr != nil {
+ logger.Warn("Telemetry init failed: %v", telErr)
+ }
+ if tel != nil {
+ // Admin HTTP server (exposes /metrics when Prometheus exporter is enabled)
+ mux := http.NewServeMux()
+ mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) })
+ if tel.PrometheusHandler != nil {
+ mux.Handle("/metrics", tel.PrometheusHandler)
+ }
+ admin := &http.Server{
+ Addr: tcfg.AdminAddr,
+ Handler: otelhttp.NewHandler(mux, "newt-admin"),
+ ReadTimeout: 5 * time.Second,
+ WriteTimeout: 10 * time.Second,
+ ReadHeaderTimeout: 5 * time.Second,
+ IdleTimeout: 30 * time.Second,
+ }
+ go func() {
+ if err := admin.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
+ logger.Warn("admin http error: %v", err)
+ }
+ }()
+ defer func() {
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ _ = admin.Shutdown(ctx)
+ }()
+ defer func() { _ = tel.Shutdown(context.Background()) }()
+ }
+
newtVersion := "version_replaceme"
if *version {
fmt.Println("Newt version " + newtVersion)
@@ -557,7 +653,10 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
}
// Use reliable ping for initial connection test
logger.Debug("Testing initial connection with reliable ping...")
- _, err = reliablePing(tnet, wgData.ServerIP, pingTimeout, 5)
+ lat, err := reliablePing(tnet, wgData.ServerIP, pingTimeout, 5)
+ if err == nil && wgData.PublicKey != "" {
+ telemetry.ObserveTunnelLatency(context.Background(), "", wgData.PublicKey, "wireguard", lat.Seconds())
+ }
if err != nil {
logger.Warn("Initial reliable ping failed, but continuing: %v", err)
} else {
@@ -570,14 +669,20 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
// as the pings will continue in the background
if !connected {
logger.Debug("Starting ping check")
- pingStopChan = startPingCheck(tnet, wgData.ServerIP, client)
+ pingStopChan = startPingCheck(tnet, wgData.ServerIP, client, wgData.PublicKey)
}
// Create proxy manager
pm = proxy.NewProxyManager(tnet)
+ pm.SetAsyncBytes(metricsAsyncBytes)
+ // Set tunnel_id for metrics (WireGuard peer public key)
+ pm.SetTunnelID(wgData.PublicKey)
connected = true
+ // telemetry: record a successful site registration (omit region unless available)
+ telemetry.IncSiteRegistration(context.Background(), id, "", "success")
+
// add the targets if there are any
if len(wgData.Targets.TCP) > 0 {
updateTargets(pm, "add", wgData.TunnelIP, "tcp", TargetData{Targets: wgData.Targets.TCP})
@@ -611,10 +716,25 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
client.RegisterHandler("newt/wg/reconnect", func(msg websocket.WSMessage) {
logger.Info("Received reconnect message")
+ if wgData.PublicKey != "" {
+ telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request")
+ }
// Close the WireGuard device and TUN
closeWgTunnel()
+ // Clear metrics attrs and sessions for the tunnel
+ if pm != nil {
+ pm.ClearTunnelID()
+ state.Global().ClearTunnel(wgData.PublicKey)
+ }
+
+ // Clear metrics attrs and sessions for the tunnel
+ if pm != nil {
+ pm.ClearTunnelID()
+ state.Global().ClearTunnel(wgData.PublicKey)
+ }
+
// Mark as disconnected
connected = false
@@ -631,6 +751,9 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
client.RegisterHandler("newt/wg/terminate", func(msg websocket.WSMessage) {
logger.Info("Received termination message")
+ if wgData.PublicKey != "" {
+ telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request")
+ }
// Close the WireGuard device and TUN
closeWgTunnel()
diff --git a/util.go b/util.go
index 7d6da4f..c1f4915 100644
--- a/util.go
+++ b/util.go
@@ -17,6 +17,7 @@ import (
"github.com/fosrl/newt/logger"
"github.com/fosrl/newt/proxy"
"github.com/fosrl/newt/websocket"
+ "github.com/fosrl/newt/internal/telemetry"
"golang.org/x/net/icmp"
"golang.org/x/net/ipv4"
"golang.zx2c4.com/wireguard/device"
@@ -229,7 +230,7 @@ func pingWithRetry(tnet *netstack.Net, dst string, timeout time.Duration) (stopC
return stopChan, fmt.Errorf("initial ping attempts failed, continuing in background")
}
-func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client) chan struct{} {
+func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client, tunnelID string) chan struct{} {
maxInterval := 6 * time.Second
currentInterval := pingInterval
consecutiveFailures := 0
@@ -292,6 +293,9 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien
if !connectionLost {
connectionLost = true
logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures)
+ if tunnelID != "" {
+ telemetry.IncReconnect(context.Background(), "", tunnelID, telemetry.ReasonTimeout)
+ }
stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second)
// Send registration message to the server for backward compatibility
err := client.SendMessage("newt/wg/register", map[string]interface{}{
@@ -318,6 +322,10 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien
} else {
// Track recent latencies
recentLatencies = append(recentLatencies, latency)
+ // Record tunnel latency (limit sampling to this periodic check)
+ if tunnelID != "" {
+ telemetry.ObserveTunnelLatency(context.Background(), "", tunnelID, "wireguard", latency.Seconds())
+ }
if len(recentLatencies) > 10 {
recentLatencies = recentLatencies[1:]
}
diff --git a/websocket/client.go b/websocket/client.go
index 0c0664a..c9ac264 100644
--- a/websocket/client.go
+++ b/websocket/client.go
@@ -18,6 +18,10 @@ import (
"github.com/fosrl/newt/logger"
"github.com/gorilla/websocket"
+
+ "context"
+ "github.com/fosrl/newt/internal/telemetry"
+ "go.opentelemetry.io/otel"
)
type Client struct {
@@ -287,6 +291,7 @@ func (c *Client) getToken() (string, error) {
}
resp, err := client.Do(req)
if err != nil {
+ telemetry.IncConnError(context.Background(), c.config.ID, "auth", classifyConnError(err))
return "", fmt.Errorf("failed to request new token: %w", err)
}
defer resp.Body.Close()
@@ -294,6 +299,18 @@ func (c *Client) getToken() (string, error) {
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body))
+ telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "failure")
+ bin := "http_other"
+ if resp.StatusCode >= 500 {
+ bin = "http_5xx"
+ } else if resp.StatusCode >= 400 {
+ bin = "http_4xx"
+ }
+ telemetry.IncConnError(context.Background(), c.config.ID, "auth", bin)
+ // Reconnect reason mapping for auth failures
+ if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
+ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonAuthError)
+ }
return "", fmt.Errorf("failed to get token with status code: %d, body: %s", resp.StatusCode, string(body))
}
@@ -312,10 +329,33 @@ func (c *Client) getToken() (string, error) {
}
logger.Debug("Received token: %s", tokenResp.Data.Token)
+ telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "success")
return tokenResp.Data.Token, nil
}
+// classifyConnError maps common errors to low-cardinality error_type labels
+func classifyConnError(err error) string {
+ if err == nil {
+ return ""
+ }
+ msg := strings.ToLower(err.Error())
+ switch {
+ case strings.Contains(msg, "tls") || strings.Contains(msg, "certificate"):
+ return "tls"
+ case strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout"):
+ return "timeout"
+ case strings.Contains(msg, "no such host") || strings.Contains(msg, "dns"):
+ return "dns"
+ case strings.Contains(msg, "unauthorized") || strings.Contains(msg, "forbidden"):
+ return "auth"
+ case strings.Contains(msg, "broken pipe") || strings.Contains(msg, "connection reset") || strings.Contains(msg, "connection refused") || strings.Contains(msg, "use of closed network connection") || strings.Contains(msg, "network is unreachable"):
+ return "io"
+ default:
+ return "other"
+ }
+}
+
func (c *Client) connectWithRetry() {
for {
select {
@@ -337,6 +377,10 @@ func (c *Client) establishConnection() error {
// Get token for authentication
token, err := c.getToken()
if err != nil {
+ // telemetry: connection attempt failed before dialing
+ // site_id isn't globally available here; use client ID as site_id (low cardinality)
+ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure")
+ telemetry.IncConnError(context.Background(), c.config.ID, "websocket", classifyConnError(err))
return fmt.Errorf("failed to get token: %w", err)
}
@@ -369,7 +413,11 @@ func (c *Client) establishConnection() error {
q.Set("clientType", c.clientType)
u.RawQuery = q.Encode()
- // Connect to WebSocket
+ // Connect to WebSocket (optional span)
+ tr := otel.Tracer("newt")
+ spanCtx, span := tr.Start(context.Background(), "ws.connect")
+ defer span.End()
+
dialer := websocket.DefaultDialer
// Use new TLS configuration method
@@ -391,11 +439,23 @@ func (c *Client) establishConnection() error {
logger.Debug("WebSocket TLS certificate verification disabled via SKIP_TLS_VERIFY environment variable")
}
- conn, _, err := dialer.Dial(u.String(), nil)
+conn, _, err := dialer.DialContext(spanCtx, u.String(), nil)
if err != nil {
+ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure")
+ etype := classifyConnError(err)
+ telemetry.IncConnError(context.Background(), c.config.ID, "websocket", etype)
+ // Map handshake-related errors to reconnect reasons where appropriate
+ if etype == "tls" {
+ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonHandshakeError)
+ } else if etype == "timeout" {
+ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonTimeout)
+ } else {
+ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonError)
+ }
return fmt.Errorf("failed to connect to WebSocket: %w", err)
}
+ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "success")
c.conn = conn
c.setConnected(true)
diff --git a/wg/wg.go b/wg/wg.go
index 3cee1a9..a765279 100644
--- a/wg/wg.go
+++ b/wg/wg.go
@@ -3,6 +3,7 @@
package wg
import (
+ "context"
"encoding/json"
"errors"
"fmt"
@@ -23,6 +24,8 @@ import (
"golang.zx2c4.com/wireguard/conn"
"golang.zx2c4.com/wireguard/wgctrl"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
+
+ "github.com/fosrl/newt/internal/telemetry"
)
type WgConfig struct {
@@ -298,6 +301,13 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) {
s.stopGetConfig = nil
}
+ // telemetry: config reload success
+ telemetry.IncConfigReload(context.Background(), "success")
+ // Optional reconnect reason mapping: config change
+ if s.serverPubKey != "" {
+ telemetry.IncReconnect(context.Background(), "", s.serverPubKey, telemetry.ReasonConfigChange)
+ }
+
// Ensure the WireGuard interface and peers are configured
if err := s.ensureWireguardInterface(config); err != nil {
logger.Error("Failed to ensure WireGuard interface: %v", err)
diff --git a/wgnetstack/wgnetstack.go b/wgnetstack/wgnetstack.go
index 6684c40..09f160e 100644
--- a/wgnetstack/wgnetstack.go
+++ b/wgnetstack/wgnetstack.go
@@ -1,6 +1,7 @@
package wgnetstack
import (
+ "context"
"crypto/rand"
"encoding/base64"
"encoding/hex"
@@ -26,6 +27,8 @@ import (
"golang.zx2c4.com/wireguard/tun"
"golang.zx2c4.com/wireguard/tun/netstack"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
+
+ "github.com/fosrl/newt/internal/telemetry"
)
type WgConfig struct {
@@ -240,14 +243,20 @@ func NewWireGuardService(interfaceName string, mtu int, generateAndSaveKeyTo str
return service, nil
}
+// ReportRTT allows reporting native RTTs to telemetry, rate-limited externally.
+func (s *WireGuardService) ReportRTT(seconds float64) {
+ if s.serverPubKey == "" { return }
+ telemetry.ObserveTunnelLatency(context.Background(), "", s.serverPubKey, "wireguard", seconds)
+}
+
func (s *WireGuardService) addTcpTarget(msg websocket.WSMessage) {
logger.Debug("Received: %+v", msg)
// if there is no wgData or pm, we can't add targets
if s.TunnelIP == "" || s.proxyManager == nil {
logger.Info("No tunnel IP or proxy manager available")
- return
- }
+ return
+}
targetData, err := parseTargetData(msg.Data)
if err != nil {