diff --git a/main.go b/main.go index 12849b1..c223b75 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,9 @@ package main import ( + "context" "encoding/json" + "errors" "flag" "fmt" "net" @@ -22,6 +24,9 @@ import ( "github.com/fosrl/newt/updates" "github.com/fosrl/newt/websocket" + "github.com/fosrl/newt/internal/state" + "github.com/fosrl/newt/internal/telemetry" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "golang.zx2c4.com/wireguard/conn" "golang.zx2c4.com/wireguard/device" "golang.zx2c4.com/wireguard/tun" @@ -116,6 +121,13 @@ var ( healthMonitor *healthcheck.Monitor enforceHealthcheckCert bool + // Observability/metrics flags + metricsEnabled bool + otlpEnabled bool + adminAddr string + region string + metricsAsyncBytes bool + // New mTLS configuration variables tlsClientCert string tlsClientKey string @@ -126,6 +138,10 @@ var ( ) func main() { + // Prepare context for graceful shutdown and signal handling + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + // if PANGOLIN_ENDPOINT, NEWT_ID, and NEWT_SECRET are set as environment variables, they will be used as default values endpoint = os.Getenv("PANGOLIN_ENDPOINT") id = os.Getenv("NEWT_ID") @@ -141,6 +157,13 @@ func main() { useNativeInterfaceEnv := os.Getenv("USE_NATIVE_INTERFACE") enforceHealthcheckCertEnv := os.Getenv("ENFORCE_HC_CERT") + // Metrics/observability env mirrors + metricsEnabledEnv := os.Getenv("NEWT_METRICS_PROMETHEUS_ENABLED") + otlpEnabledEnv := os.Getenv("NEWT_METRICS_OTLP_ENABLED") + adminAddrEnv := os.Getenv("NEWT_ADMIN_ADDR") + regionEnv := os.Getenv("NEWT_REGION") + asyncBytesEnv := os.Getenv("NEWT_METRICS_ASYNC_BYTES") + keepInterface = keepInterfaceEnv == "true" acceptClients = acceptClientsEnv == "true" useNativeInterface = useNativeInterfaceEnv == "true" @@ -272,6 +295,35 @@ func main() { flag.StringVar(&healthFile, "health-file", "", "Path to health file (if unset, health file won't be written)") } + // Metrics/observability flags (mirror ENV if unset) + if metricsEnabledEnv == "" { + flag.BoolVar(&metricsEnabled, "metrics", true, "Enable Prometheus /metrics exporter") + } else { + if v, err := strconv.ParseBool(metricsEnabledEnv); err == nil { metricsEnabled = v } else { metricsEnabled = true } + } + if otlpEnabledEnv == "" { + flag.BoolVar(&otlpEnabled, "otlp", false, "Enable OTLP exporters (metrics/traces) to OTEL_EXPORTER_OTLP_ENDPOINT") + } else { + if v, err := strconv.ParseBool(otlpEnabledEnv); err == nil { otlpEnabled = v } + } + if adminAddrEnv == "" { + flag.StringVar(&adminAddr, "metrics-admin-addr", "127.0.0.1:2112", "Admin/metrics bind address") + } else { + adminAddr = adminAddrEnv + } + // Async bytes toggle + if asyncBytesEnv == "" { + flag.BoolVar(&metricsAsyncBytes, "metrics-async-bytes", false, "Enable async bytes counting (background flush; lower hot path overhead)") + } else { + if v, err := strconv.ParseBool(asyncBytesEnv); err == nil { metricsAsyncBytes = v } + } + // Optional region flag (resource attribute) + if regionEnv == "" { + flag.StringVar(®ion, "region", "", "Optional region resource attribute (also NEWT_REGION)") + } else { + region = regionEnv + } + // do a --version check version := flag.Bool("version", false, "Print the version") @@ -286,6 +338,50 @@ func main() { loggerLevel := parseLogLevel(logLevel) logger.GetLogger().SetLevel(parseLogLevel(logLevel)) + // Initialize telemetry after flags are parsed (so flags override env) + tcfg := telemetry.FromEnv() + tcfg.PromEnabled = metricsEnabled + tcfg.OTLPEnabled = otlpEnabled + if adminAddr != "" { tcfg.AdminAddr = adminAddr } + // Resource attributes (if available) + tcfg.SiteID = id + tcfg.Region = region + // Build info + tcfg.BuildVersion = newtVersion + tcfg.BuildCommit = os.Getenv("NEWT_COMMIT") + + tel, telErr := telemetry.Init(ctx, tcfg) + if telErr != nil { + logger.Warn("Telemetry init failed: %v", telErr) + } + if tel != nil { + // Admin HTTP server (exposes /metrics when Prometheus exporter is enabled) + mux := http.NewServeMux() + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) }) + if tel.PrometheusHandler != nil { + mux.Handle("/metrics", tel.PrometheusHandler) + } + admin := &http.Server{ + Addr: tcfg.AdminAddr, + Handler: otelhttp.NewHandler(mux, "newt-admin"), + ReadTimeout: 5 * time.Second, + WriteTimeout: 10 * time.Second, + ReadHeaderTimeout: 5 * time.Second, + IdleTimeout: 30 * time.Second, + } + go func() { + if err := admin.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Warn("admin http error: %v", err) + } + }() + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = admin.Shutdown(ctx) + }() + defer func() { _ = tel.Shutdown(context.Background()) }() + } + newtVersion := "version_replaceme" if *version { fmt.Println("Newt version " + newtVersion) @@ -557,7 +653,10 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub } // Use reliable ping for initial connection test logger.Debug("Testing initial connection with reliable ping...") - _, err = reliablePing(tnet, wgData.ServerIP, pingTimeout, 5) + lat, err := reliablePing(tnet, wgData.ServerIP, pingTimeout, 5) + if err == nil && wgData.PublicKey != "" { + telemetry.ObserveTunnelLatency(context.Background(), "", wgData.PublicKey, "wireguard", lat.Seconds()) + } if err != nil { logger.Warn("Initial reliable ping failed, but continuing: %v", err) } else { @@ -570,14 +669,20 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub // as the pings will continue in the background if !connected { logger.Debug("Starting ping check") - pingStopChan = startPingCheck(tnet, wgData.ServerIP, client) + pingStopChan = startPingCheck(tnet, wgData.ServerIP, client, wgData.PublicKey) } // Create proxy manager pm = proxy.NewProxyManager(tnet) + pm.SetAsyncBytes(metricsAsyncBytes) + // Set tunnel_id for metrics (WireGuard peer public key) + pm.SetTunnelID(wgData.PublicKey) connected = true + // telemetry: record a successful site registration (omit region unless available) + telemetry.IncSiteRegistration(context.Background(), id, "", "success") + // add the targets if there are any if len(wgData.Targets.TCP) > 0 { updateTargets(pm, "add", wgData.TunnelIP, "tcp", TargetData{Targets: wgData.Targets.TCP}) @@ -611,10 +716,25 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub client.RegisterHandler("newt/wg/reconnect", func(msg websocket.WSMessage) { logger.Info("Received reconnect message") + if wgData.PublicKey != "" { + telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request") + } // Close the WireGuard device and TUN closeWgTunnel() + // Clear metrics attrs and sessions for the tunnel + if pm != nil { + pm.ClearTunnelID() + state.Global().ClearTunnel(wgData.PublicKey) + } + + // Clear metrics attrs and sessions for the tunnel + if pm != nil { + pm.ClearTunnelID() + state.Global().ClearTunnel(wgData.PublicKey) + } + // Mark as disconnected connected = false @@ -631,6 +751,9 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub client.RegisterHandler("newt/wg/terminate", func(msg websocket.WSMessage) { logger.Info("Received termination message") + if wgData.PublicKey != "" { + telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request") + } // Close the WireGuard device and TUN closeWgTunnel() diff --git a/util.go b/util.go index 7d6da4f..c1f4915 100644 --- a/util.go +++ b/util.go @@ -17,6 +17,7 @@ import ( "github.com/fosrl/newt/logger" "github.com/fosrl/newt/proxy" "github.com/fosrl/newt/websocket" + "github.com/fosrl/newt/internal/telemetry" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" "golang.zx2c4.com/wireguard/device" @@ -229,7 +230,7 @@ func pingWithRetry(tnet *netstack.Net, dst string, timeout time.Duration) (stopC return stopChan, fmt.Errorf("initial ping attempts failed, continuing in background") } -func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client) chan struct{} { +func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client, tunnelID string) chan struct{} { maxInterval := 6 * time.Second currentInterval := pingInterval consecutiveFailures := 0 @@ -292,6 +293,9 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien if !connectionLost { connectionLost = true logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures) + if tunnelID != "" { + telemetry.IncReconnect(context.Background(), "", tunnelID, telemetry.ReasonTimeout) + } stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second) // Send registration message to the server for backward compatibility err := client.SendMessage("newt/wg/register", map[string]interface{}{ @@ -318,6 +322,10 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien } else { // Track recent latencies recentLatencies = append(recentLatencies, latency) + // Record tunnel latency (limit sampling to this periodic check) + if tunnelID != "" { + telemetry.ObserveTunnelLatency(context.Background(), "", tunnelID, "wireguard", latency.Seconds()) + } if len(recentLatencies) > 10 { recentLatencies = recentLatencies[1:] } diff --git a/websocket/client.go b/websocket/client.go index 0c0664a..c9ac264 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -18,6 +18,10 @@ import ( "github.com/fosrl/newt/logger" "github.com/gorilla/websocket" + + "context" + "github.com/fosrl/newt/internal/telemetry" + "go.opentelemetry.io/otel" ) type Client struct { @@ -287,6 +291,7 @@ func (c *Client) getToken() (string, error) { } resp, err := client.Do(req) if err != nil { + telemetry.IncConnError(context.Background(), c.config.ID, "auth", classifyConnError(err)) return "", fmt.Errorf("failed to request new token: %w", err) } defer resp.Body.Close() @@ -294,6 +299,18 @@ func (c *Client) getToken() (string, error) { if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) + telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "failure") + bin := "http_other" + if resp.StatusCode >= 500 { + bin = "http_5xx" + } else if resp.StatusCode >= 400 { + bin = "http_4xx" + } + telemetry.IncConnError(context.Background(), c.config.ID, "auth", bin) + // Reconnect reason mapping for auth failures + if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { + telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonAuthError) + } return "", fmt.Errorf("failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) } @@ -312,10 +329,33 @@ func (c *Client) getToken() (string, error) { } logger.Debug("Received token: %s", tokenResp.Data.Token) + telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "success") return tokenResp.Data.Token, nil } +// classifyConnError maps common errors to low-cardinality error_type labels +func classifyConnError(err error) string { + if err == nil { + return "" + } + msg := strings.ToLower(err.Error()) + switch { + case strings.Contains(msg, "tls") || strings.Contains(msg, "certificate"): + return "tls" + case strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout"): + return "timeout" + case strings.Contains(msg, "no such host") || strings.Contains(msg, "dns"): + return "dns" + case strings.Contains(msg, "unauthorized") || strings.Contains(msg, "forbidden"): + return "auth" + case strings.Contains(msg, "broken pipe") || strings.Contains(msg, "connection reset") || strings.Contains(msg, "connection refused") || strings.Contains(msg, "use of closed network connection") || strings.Contains(msg, "network is unreachable"): + return "io" + default: + return "other" + } +} + func (c *Client) connectWithRetry() { for { select { @@ -337,6 +377,10 @@ func (c *Client) establishConnection() error { // Get token for authentication token, err := c.getToken() if err != nil { + // telemetry: connection attempt failed before dialing + // site_id isn't globally available here; use client ID as site_id (low cardinality) + telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure") + telemetry.IncConnError(context.Background(), c.config.ID, "websocket", classifyConnError(err)) return fmt.Errorf("failed to get token: %w", err) } @@ -369,7 +413,11 @@ func (c *Client) establishConnection() error { q.Set("clientType", c.clientType) u.RawQuery = q.Encode() - // Connect to WebSocket + // Connect to WebSocket (optional span) + tr := otel.Tracer("newt") + spanCtx, span := tr.Start(context.Background(), "ws.connect") + defer span.End() + dialer := websocket.DefaultDialer // Use new TLS configuration method @@ -391,11 +439,23 @@ func (c *Client) establishConnection() error { logger.Debug("WebSocket TLS certificate verification disabled via SKIP_TLS_VERIFY environment variable") } - conn, _, err := dialer.Dial(u.String(), nil) +conn, _, err := dialer.DialContext(spanCtx, u.String(), nil) if err != nil { + telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure") + etype := classifyConnError(err) + telemetry.IncConnError(context.Background(), c.config.ID, "websocket", etype) + // Map handshake-related errors to reconnect reasons where appropriate + if etype == "tls" { + telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonHandshakeError) + } else if etype == "timeout" { + telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonTimeout) + } else { + telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonError) + } return fmt.Errorf("failed to connect to WebSocket: %w", err) } + telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "success") c.conn = conn c.setConnected(true) diff --git a/wg/wg.go b/wg/wg.go index 3cee1a9..a765279 100644 --- a/wg/wg.go +++ b/wg/wg.go @@ -3,6 +3,7 @@ package wg import ( + "context" "encoding/json" "errors" "fmt" @@ -23,6 +24,8 @@ import ( "golang.zx2c4.com/wireguard/conn" "golang.zx2c4.com/wireguard/wgctrl" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + + "github.com/fosrl/newt/internal/telemetry" ) type WgConfig struct { @@ -298,6 +301,13 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { s.stopGetConfig = nil } + // telemetry: config reload success + telemetry.IncConfigReload(context.Background(), "success") + // Optional reconnect reason mapping: config change + if s.serverPubKey != "" { + telemetry.IncReconnect(context.Background(), "", s.serverPubKey, telemetry.ReasonConfigChange) + } + // Ensure the WireGuard interface and peers are configured if err := s.ensureWireguardInterface(config); err != nil { logger.Error("Failed to ensure WireGuard interface: %v", err)