mirror of
https://github.com/fosrl/newt.git
synced 2026-03-26 20:46:41 +00:00
feat: Implement telemetry for reconnect reasons and RTT reporting
- Added telemetry hooks to track reconnect reasons for WireGuard connections, including server requests and authentication errors. - Introduced RTT reporting to telemetry for better latency monitoring. - Enhanced metrics configuration with flags for Prometheus and OTLP exporters. - Implemented graceful shutdown and signal handling in the main application. - Updated WebSocket client to classify connection errors and report them to telemetry. - Added support for async byte counting in metrics. - Improved handling of reconnect scenarios in the WireGuard service. - Added documentation for applying patches and rollback procedures.
This commit is contained in:
422
patches/02_reconnect_reasons.patch
Normal file
422
patches/02_reconnect_reasons.patch
Normal file
@@ -0,0 +1,422 @@
|
||||
diff --git a/main.go b/main.go
|
||||
index 12849b1..c223b75 100644
|
||||
--- a/main.go
|
||||
+++ b/main.go
|
||||
@@ -1,7 +1,9 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
+ "context"
|
||||
"encoding/json"
|
||||
+ "errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net"
|
||||
@@ -22,6 +24,9 @@ import (
|
||||
"github.com/fosrl/newt/updates"
|
||||
"github.com/fosrl/newt/websocket"
|
||||
|
||||
+ "github.com/fosrl/newt/internal/state"
|
||||
+ "github.com/fosrl/newt/internal/telemetry"
|
||||
+ "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
"golang.zx2c4.com/wireguard/conn"
|
||||
"golang.zx2c4.com/wireguard/device"
|
||||
"golang.zx2c4.com/wireguard/tun"
|
||||
@@ -116,6 +121,13 @@ var (
|
||||
healthMonitor *healthcheck.Monitor
|
||||
enforceHealthcheckCert bool
|
||||
|
||||
+ // Observability/metrics flags
|
||||
+ metricsEnabled bool
|
||||
+ otlpEnabled bool
|
||||
+ adminAddr string
|
||||
+ region string
|
||||
+ metricsAsyncBytes bool
|
||||
+
|
||||
// New mTLS configuration variables
|
||||
tlsClientCert string
|
||||
tlsClientKey string
|
||||
@@ -126,6 +138,10 @@ var (
|
||||
)
|
||||
|
||||
func main() {
|
||||
+ // Prepare context for graceful shutdown and signal handling
|
||||
+ ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
||||
+ defer stop()
|
||||
+
|
||||
// if PANGOLIN_ENDPOINT, NEWT_ID, and NEWT_SECRET are set as environment variables, they will be used as default values
|
||||
endpoint = os.Getenv("PANGOLIN_ENDPOINT")
|
||||
id = os.Getenv("NEWT_ID")
|
||||
@@ -141,6 +157,13 @@ func main() {
|
||||
useNativeInterfaceEnv := os.Getenv("USE_NATIVE_INTERFACE")
|
||||
enforceHealthcheckCertEnv := os.Getenv("ENFORCE_HC_CERT")
|
||||
|
||||
+ // Metrics/observability env mirrors
|
||||
+ metricsEnabledEnv := os.Getenv("NEWT_METRICS_PROMETHEUS_ENABLED")
|
||||
+ otlpEnabledEnv := os.Getenv("NEWT_METRICS_OTLP_ENABLED")
|
||||
+ adminAddrEnv := os.Getenv("NEWT_ADMIN_ADDR")
|
||||
+ regionEnv := os.Getenv("NEWT_REGION")
|
||||
+ asyncBytesEnv := os.Getenv("NEWT_METRICS_ASYNC_BYTES")
|
||||
+
|
||||
keepInterface = keepInterfaceEnv == "true"
|
||||
acceptClients = acceptClientsEnv == "true"
|
||||
useNativeInterface = useNativeInterfaceEnv == "true"
|
||||
@@ -272,6 +295,35 @@ func main() {
|
||||
flag.StringVar(&healthFile, "health-file", "", "Path to health file (if unset, health file won't be written)")
|
||||
}
|
||||
|
||||
+ // Metrics/observability flags (mirror ENV if unset)
|
||||
+ if metricsEnabledEnv == "" {
|
||||
+ flag.BoolVar(&metricsEnabled, "metrics", true, "Enable Prometheus /metrics exporter")
|
||||
+ } else {
|
||||
+ if v, err := strconv.ParseBool(metricsEnabledEnv); err == nil { metricsEnabled = v } else { metricsEnabled = true }
|
||||
+ }
|
||||
+ if otlpEnabledEnv == "" {
|
||||
+ flag.BoolVar(&otlpEnabled, "otlp", false, "Enable OTLP exporters (metrics/traces) to OTEL_EXPORTER_OTLP_ENDPOINT")
|
||||
+ } else {
|
||||
+ if v, err := strconv.ParseBool(otlpEnabledEnv); err == nil { otlpEnabled = v }
|
||||
+ }
|
||||
+ if adminAddrEnv == "" {
|
||||
+ flag.StringVar(&adminAddr, "metrics-admin-addr", "127.0.0.1:2112", "Admin/metrics bind address")
|
||||
+ } else {
|
||||
+ adminAddr = adminAddrEnv
|
||||
+ }
|
||||
+ // Async bytes toggle
|
||||
+ if asyncBytesEnv == "" {
|
||||
+ flag.BoolVar(&metricsAsyncBytes, "metrics-async-bytes", false, "Enable async bytes counting (background flush; lower hot path overhead)")
|
||||
+ } else {
|
||||
+ if v, err := strconv.ParseBool(asyncBytesEnv); err == nil { metricsAsyncBytes = v }
|
||||
+ }
|
||||
+ // Optional region flag (resource attribute)
|
||||
+ if regionEnv == "" {
|
||||
+ flag.StringVar(®ion, "region", "", "Optional region resource attribute (also NEWT_REGION)")
|
||||
+ } else {
|
||||
+ region = regionEnv
|
||||
+ }
|
||||
+
|
||||
// do a --version check
|
||||
version := flag.Bool("version", false, "Print the version")
|
||||
|
||||
@@ -286,6 +338,50 @@ func main() {
|
||||
loggerLevel := parseLogLevel(logLevel)
|
||||
logger.GetLogger().SetLevel(parseLogLevel(logLevel))
|
||||
|
||||
+ // Initialize telemetry after flags are parsed (so flags override env)
|
||||
+ tcfg := telemetry.FromEnv()
|
||||
+ tcfg.PromEnabled = metricsEnabled
|
||||
+ tcfg.OTLPEnabled = otlpEnabled
|
||||
+ if adminAddr != "" { tcfg.AdminAddr = adminAddr }
|
||||
+ // Resource attributes (if available)
|
||||
+ tcfg.SiteID = id
|
||||
+ tcfg.Region = region
|
||||
+ // Build info
|
||||
+ tcfg.BuildVersion = newtVersion
|
||||
+ tcfg.BuildCommit = os.Getenv("NEWT_COMMIT")
|
||||
+
|
||||
+ tel, telErr := telemetry.Init(ctx, tcfg)
|
||||
+ if telErr != nil {
|
||||
+ logger.Warn("Telemetry init failed: %v", telErr)
|
||||
+ }
|
||||
+ if tel != nil {
|
||||
+ // Admin HTTP server (exposes /metrics when Prometheus exporter is enabled)
|
||||
+ mux := http.NewServeMux()
|
||||
+ mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) })
|
||||
+ if tel.PrometheusHandler != nil {
|
||||
+ mux.Handle("/metrics", tel.PrometheusHandler)
|
||||
+ }
|
||||
+ admin := &http.Server{
|
||||
+ Addr: tcfg.AdminAddr,
|
||||
+ Handler: otelhttp.NewHandler(mux, "newt-admin"),
|
||||
+ ReadTimeout: 5 * time.Second,
|
||||
+ WriteTimeout: 10 * time.Second,
|
||||
+ ReadHeaderTimeout: 5 * time.Second,
|
||||
+ IdleTimeout: 30 * time.Second,
|
||||
+ }
|
||||
+ go func() {
|
||||
+ if err := admin.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
+ logger.Warn("admin http error: %v", err)
|
||||
+ }
|
||||
+ }()
|
||||
+ defer func() {
|
||||
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
+ defer cancel()
|
||||
+ _ = admin.Shutdown(ctx)
|
||||
+ }()
|
||||
+ defer func() { _ = tel.Shutdown(context.Background()) }()
|
||||
+ }
|
||||
+
|
||||
newtVersion := "version_replaceme"
|
||||
if *version {
|
||||
fmt.Println("Newt version " + newtVersion)
|
||||
@@ -557,7 +653,10 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
}
|
||||
// Use reliable ping for initial connection test
|
||||
logger.Debug("Testing initial connection with reliable ping...")
|
||||
- _, err = reliablePing(tnet, wgData.ServerIP, pingTimeout, 5)
|
||||
+ lat, err := reliablePing(tnet, wgData.ServerIP, pingTimeout, 5)
|
||||
+ if err == nil && wgData.PublicKey != "" {
|
||||
+ telemetry.ObserveTunnelLatency(context.Background(), "", wgData.PublicKey, "wireguard", lat.Seconds())
|
||||
+ }
|
||||
if err != nil {
|
||||
logger.Warn("Initial reliable ping failed, but continuing: %v", err)
|
||||
} else {
|
||||
@@ -570,14 +669,20 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
// as the pings will continue in the background
|
||||
if !connected {
|
||||
logger.Debug("Starting ping check")
|
||||
- pingStopChan = startPingCheck(tnet, wgData.ServerIP, client)
|
||||
+ pingStopChan = startPingCheck(tnet, wgData.ServerIP, client, wgData.PublicKey)
|
||||
}
|
||||
|
||||
// Create proxy manager
|
||||
pm = proxy.NewProxyManager(tnet)
|
||||
+ pm.SetAsyncBytes(metricsAsyncBytes)
|
||||
+ // Set tunnel_id for metrics (WireGuard peer public key)
|
||||
+ pm.SetTunnelID(wgData.PublicKey)
|
||||
|
||||
connected = true
|
||||
|
||||
+ // telemetry: record a successful site registration (omit region unless available)
|
||||
+ telemetry.IncSiteRegistration(context.Background(), id, "", "success")
|
||||
+
|
||||
// add the targets if there are any
|
||||
if len(wgData.Targets.TCP) > 0 {
|
||||
updateTargets(pm, "add", wgData.TunnelIP, "tcp", TargetData{Targets: wgData.Targets.TCP})
|
||||
@@ -611,10 +716,25 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
|
||||
client.RegisterHandler("newt/wg/reconnect", func(msg websocket.WSMessage) {
|
||||
logger.Info("Received reconnect message")
|
||||
+ if wgData.PublicKey != "" {
|
||||
+ telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request")
|
||||
+ }
|
||||
|
||||
// Close the WireGuard device and TUN
|
||||
closeWgTunnel()
|
||||
|
||||
+ // Clear metrics attrs and sessions for the tunnel
|
||||
+ if pm != nil {
|
||||
+ pm.ClearTunnelID()
|
||||
+ state.Global().ClearTunnel(wgData.PublicKey)
|
||||
+ }
|
||||
+
|
||||
+ // Clear metrics attrs and sessions for the tunnel
|
||||
+ if pm != nil {
|
||||
+ pm.ClearTunnelID()
|
||||
+ state.Global().ClearTunnel(wgData.PublicKey)
|
||||
+ }
|
||||
+
|
||||
// Mark as disconnected
|
||||
connected = false
|
||||
|
||||
@@ -631,6 +751,9 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
|
||||
client.RegisterHandler("newt/wg/terminate", func(msg websocket.WSMessage) {
|
||||
logger.Info("Received termination message")
|
||||
+ if wgData.PublicKey != "" {
|
||||
+ telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request")
|
||||
+ }
|
||||
|
||||
// Close the WireGuard device and TUN
|
||||
closeWgTunnel()
|
||||
diff --git a/util.go b/util.go
|
||||
index 7d6da4f..c1f4915 100644
|
||||
--- a/util.go
|
||||
+++ b/util.go
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
"github.com/fosrl/newt/logger"
|
||||
"github.com/fosrl/newt/proxy"
|
||||
"github.com/fosrl/newt/websocket"
|
||||
+ "github.com/fosrl/newt/internal/telemetry"
|
||||
"golang.org/x/net/icmp"
|
||||
"golang.org/x/net/ipv4"
|
||||
"golang.zx2c4.com/wireguard/device"
|
||||
@@ -229,7 +230,7 @@ func pingWithRetry(tnet *netstack.Net, dst string, timeout time.Duration) (stopC
|
||||
return stopChan, fmt.Errorf("initial ping attempts failed, continuing in background")
|
||||
}
|
||||
|
||||
-func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client) chan struct{} {
|
||||
+func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client, tunnelID string) chan struct{} {
|
||||
maxInterval := 6 * time.Second
|
||||
currentInterval := pingInterval
|
||||
consecutiveFailures := 0
|
||||
@@ -292,6 +293,9 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien
|
||||
if !connectionLost {
|
||||
connectionLost = true
|
||||
logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures)
|
||||
+ if tunnelID != "" {
|
||||
+ telemetry.IncReconnect(context.Background(), "", tunnelID, telemetry.ReasonTimeout)
|
||||
+ }
|
||||
stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second)
|
||||
// Send registration message to the server for backward compatibility
|
||||
err := client.SendMessage("newt/wg/register", map[string]interface{}{
|
||||
@@ -318,6 +322,10 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien
|
||||
} else {
|
||||
// Track recent latencies
|
||||
recentLatencies = append(recentLatencies, latency)
|
||||
+ // Record tunnel latency (limit sampling to this periodic check)
|
||||
+ if tunnelID != "" {
|
||||
+ telemetry.ObserveTunnelLatency(context.Background(), "", tunnelID, "wireguard", latency.Seconds())
|
||||
+ }
|
||||
if len(recentLatencies) > 10 {
|
||||
recentLatencies = recentLatencies[1:]
|
||||
}
|
||||
diff --git a/websocket/client.go b/websocket/client.go
|
||||
index 0c0664a..c9ac264 100644
|
||||
--- a/websocket/client.go
|
||||
+++ b/websocket/client.go
|
||||
@@ -18,6 +18,10 @@ import (
|
||||
|
||||
"github.com/fosrl/newt/logger"
|
||||
"github.com/gorilla/websocket"
|
||||
+
|
||||
+ "context"
|
||||
+ "github.com/fosrl/newt/internal/telemetry"
|
||||
+ "go.opentelemetry.io/otel"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
@@ -287,6 +291,7 @@ func (c *Client) getToken() (string, error) {
|
||||
}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
+ telemetry.IncConnError(context.Background(), c.config.ID, "auth", classifyConnError(err))
|
||||
return "", fmt.Errorf("failed to request new token: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
@@ -294,6 +299,18 @@ func (c *Client) getToken() (string, error) {
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body))
|
||||
+ telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "failure")
|
||||
+ bin := "http_other"
|
||||
+ if resp.StatusCode >= 500 {
|
||||
+ bin = "http_5xx"
|
||||
+ } else if resp.StatusCode >= 400 {
|
||||
+ bin = "http_4xx"
|
||||
+ }
|
||||
+ telemetry.IncConnError(context.Background(), c.config.ID, "auth", bin)
|
||||
+ // Reconnect reason mapping for auth failures
|
||||
+ if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
|
||||
+ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonAuthError)
|
||||
+ }
|
||||
return "", fmt.Errorf("failed to get token with status code: %d, body: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
@@ -312,10 +329,33 @@ func (c *Client) getToken() (string, error) {
|
||||
}
|
||||
|
||||
logger.Debug("Received token: %s", tokenResp.Data.Token)
|
||||
+ telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "success")
|
||||
|
||||
return tokenResp.Data.Token, nil
|
||||
}
|
||||
|
||||
+// classifyConnError maps common errors to low-cardinality error_type labels
|
||||
+func classifyConnError(err error) string {
|
||||
+ if err == nil {
|
||||
+ return ""
|
||||
+ }
|
||||
+ msg := strings.ToLower(err.Error())
|
||||
+ switch {
|
||||
+ case strings.Contains(msg, "tls") || strings.Contains(msg, "certificate"):
|
||||
+ return "tls"
|
||||
+ case strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout"):
|
||||
+ return "timeout"
|
||||
+ case strings.Contains(msg, "no such host") || strings.Contains(msg, "dns"):
|
||||
+ return "dns"
|
||||
+ case strings.Contains(msg, "unauthorized") || strings.Contains(msg, "forbidden"):
|
||||
+ return "auth"
|
||||
+ case strings.Contains(msg, "broken pipe") || strings.Contains(msg, "connection reset") || strings.Contains(msg, "connection refused") || strings.Contains(msg, "use of closed network connection") || strings.Contains(msg, "network is unreachable"):
|
||||
+ return "io"
|
||||
+ default:
|
||||
+ return "other"
|
||||
+ }
|
||||
+}
|
||||
+
|
||||
func (c *Client) connectWithRetry() {
|
||||
for {
|
||||
select {
|
||||
@@ -337,6 +377,10 @@ func (c *Client) establishConnection() error {
|
||||
// Get token for authentication
|
||||
token, err := c.getToken()
|
||||
if err != nil {
|
||||
+ // telemetry: connection attempt failed before dialing
|
||||
+ // site_id isn't globally available here; use client ID as site_id (low cardinality)
|
||||
+ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure")
|
||||
+ telemetry.IncConnError(context.Background(), c.config.ID, "websocket", classifyConnError(err))
|
||||
return fmt.Errorf("failed to get token: %w", err)
|
||||
}
|
||||
|
||||
@@ -369,7 +413,11 @@ func (c *Client) establishConnection() error {
|
||||
q.Set("clientType", c.clientType)
|
||||
u.RawQuery = q.Encode()
|
||||
|
||||
- // Connect to WebSocket
|
||||
+ // Connect to WebSocket (optional span)
|
||||
+ tr := otel.Tracer("newt")
|
||||
+ spanCtx, span := tr.Start(context.Background(), "ws.connect")
|
||||
+ defer span.End()
|
||||
+
|
||||
dialer := websocket.DefaultDialer
|
||||
|
||||
// Use new TLS configuration method
|
||||
@@ -391,11 +439,23 @@ func (c *Client) establishConnection() error {
|
||||
logger.Debug("WebSocket TLS certificate verification disabled via SKIP_TLS_VERIFY environment variable")
|
||||
}
|
||||
|
||||
- conn, _, err := dialer.Dial(u.String(), nil)
|
||||
+conn, _, err := dialer.DialContext(spanCtx, u.String(), nil)
|
||||
if err != nil {
|
||||
+ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure")
|
||||
+ etype := classifyConnError(err)
|
||||
+ telemetry.IncConnError(context.Background(), c.config.ID, "websocket", etype)
|
||||
+ // Map handshake-related errors to reconnect reasons where appropriate
|
||||
+ if etype == "tls" {
|
||||
+ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonHandshakeError)
|
||||
+ } else if etype == "timeout" {
|
||||
+ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonTimeout)
|
||||
+ } else {
|
||||
+ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonError)
|
||||
+ }
|
||||
return fmt.Errorf("failed to connect to WebSocket: %w", err)
|
||||
}
|
||||
|
||||
+ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "success")
|
||||
c.conn = conn
|
||||
c.setConnected(true)
|
||||
|
||||
diff --git a/wg/wg.go b/wg/wg.go
|
||||
index 3cee1a9..a765279 100644
|
||||
--- a/wg/wg.go
|
||||
+++ b/wg/wg.go
|
||||
@@ -3,6 +3,7 @@
|
||||
package wg
|
||||
|
||||
import (
|
||||
+ "context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -23,6 +24,8 @@ import (
|
||||
"golang.zx2c4.com/wireguard/conn"
|
||||
"golang.zx2c4.com/wireguard/wgctrl"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
+
|
||||
+ "github.com/fosrl/newt/internal/telemetry"
|
||||
)
|
||||
|
||||
type WgConfig struct {
|
||||
@@ -298,6 +301,13 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) {
|
||||
s.stopGetConfig = nil
|
||||
}
|
||||
|
||||
+ // telemetry: config reload success
|
||||
+ telemetry.IncConfigReload(context.Background(), "success")
|
||||
+ // Optional reconnect reason mapping: config change
|
||||
+ if s.serverPubKey != "" {
|
||||
+ telemetry.IncReconnect(context.Background(), "", s.serverPubKey, telemetry.ReasonConfigChange)
|
||||
+ }
|
||||
+
|
||||
// Ensure the WireGuard interface and peers are configured
|
||||
if err := s.ensureWireguardInterface(config); err != nil {
|
||||
logger.Error("Failed to ensure WireGuard interface: %v", err)
|
||||
Reference in New Issue
Block a user