diff --git a/Dockerfile b/Dockerfile index b9c4d29..b9b6dea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -22,6 +22,9 @@ RUN apk --no-cache add ca-certificates tzdata COPY --from=builder /newt /usr/local/bin/ COPY entrypoint.sh / +# Admin/metrics endpoint (Prometheus scrape) +EXPOSE 2112 + RUN chmod +x /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] -CMD ["newt"] \ No newline at end of file +CMD ["newt"] diff --git a/go.mod b/go.mod index d475835..5909955 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,14 @@ require ( github.com/google/gopacket v1.1.19 github.com/gorilla/websocket v1.5.3 github.com/vishvananda/netlink v1.3.1 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 + go.opentelemetry.io/contrib/instrumentation/runtime v0.62.0 + go.opentelemetry.io/otel v1.37.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.37.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 + go.opentelemetry.io/otel/sdk/metric v1.37.0 + go.opentelemetry.io/otel/sdk/trace v1.37.0 + go.opentelemetry.io/otel/semconv v1.26.0 golang.org/x/crypto v0.42.0 golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 golang.org/x/net v0.44.0 diff --git a/main.go b/main.go index 12849b1..c223b75 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,9 @@ package main import ( + "context" "encoding/json" + "errors" "flag" "fmt" "net" @@ -22,6 +24,9 @@ import ( "github.com/fosrl/newt/updates" "github.com/fosrl/newt/websocket" + "github.com/fosrl/newt/internal/state" + "github.com/fosrl/newt/internal/telemetry" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "golang.zx2c4.com/wireguard/conn" "golang.zx2c4.com/wireguard/device" "golang.zx2c4.com/wireguard/tun" @@ -116,6 +121,13 @@ var ( healthMonitor *healthcheck.Monitor enforceHealthcheckCert bool + // Observability/metrics flags + metricsEnabled bool + otlpEnabled bool + adminAddr string + region string + metricsAsyncBytes bool + // New mTLS configuration variables tlsClientCert string tlsClientKey string @@ -126,6 +138,10 @@ var ( ) func main() { + // Prepare context for graceful shutdown and signal handling + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() + // if PANGOLIN_ENDPOINT, NEWT_ID, and NEWT_SECRET are set as environment variables, they will be used as default values endpoint = os.Getenv("PANGOLIN_ENDPOINT") id = os.Getenv("NEWT_ID") @@ -141,6 +157,13 @@ func main() { useNativeInterfaceEnv := os.Getenv("USE_NATIVE_INTERFACE") enforceHealthcheckCertEnv := os.Getenv("ENFORCE_HC_CERT") + // Metrics/observability env mirrors + metricsEnabledEnv := os.Getenv("NEWT_METRICS_PROMETHEUS_ENABLED") + otlpEnabledEnv := os.Getenv("NEWT_METRICS_OTLP_ENABLED") + adminAddrEnv := os.Getenv("NEWT_ADMIN_ADDR") + regionEnv := os.Getenv("NEWT_REGION") + asyncBytesEnv := os.Getenv("NEWT_METRICS_ASYNC_BYTES") + keepInterface = keepInterfaceEnv == "true" acceptClients = acceptClientsEnv == "true" useNativeInterface = useNativeInterfaceEnv == "true" @@ -272,6 +295,35 @@ func main() { flag.StringVar(&healthFile, "health-file", "", "Path to health file (if unset, health file won't be written)") } + // Metrics/observability flags (mirror ENV if unset) + if metricsEnabledEnv == "" { + flag.BoolVar(&metricsEnabled, "metrics", true, "Enable Prometheus /metrics exporter") + } else { + if v, err := strconv.ParseBool(metricsEnabledEnv); err == nil { metricsEnabled = v } else { metricsEnabled = true } + } + if otlpEnabledEnv == "" { + flag.BoolVar(&otlpEnabled, "otlp", false, "Enable OTLP exporters (metrics/traces) to OTEL_EXPORTER_OTLP_ENDPOINT") + } else { + if v, err := strconv.ParseBool(otlpEnabledEnv); err == nil { otlpEnabled = v } + } + if adminAddrEnv == "" { + flag.StringVar(&adminAddr, "metrics-admin-addr", "127.0.0.1:2112", "Admin/metrics bind address") + } else { + adminAddr = adminAddrEnv + } + // Async bytes toggle + if asyncBytesEnv == "" { + flag.BoolVar(&metricsAsyncBytes, "metrics-async-bytes", false, "Enable async bytes counting (background flush; lower hot path overhead)") + } else { + if v, err := strconv.ParseBool(asyncBytesEnv); err == nil { metricsAsyncBytes = v } + } + // Optional region flag (resource attribute) + if regionEnv == "" { + flag.StringVar(®ion, "region", "", "Optional region resource attribute (also NEWT_REGION)") + } else { + region = regionEnv + } + // do a --version check version := flag.Bool("version", false, "Print the version") @@ -286,6 +338,50 @@ func main() { loggerLevel := parseLogLevel(logLevel) logger.GetLogger().SetLevel(parseLogLevel(logLevel)) + // Initialize telemetry after flags are parsed (so flags override env) + tcfg := telemetry.FromEnv() + tcfg.PromEnabled = metricsEnabled + tcfg.OTLPEnabled = otlpEnabled + if adminAddr != "" { tcfg.AdminAddr = adminAddr } + // Resource attributes (if available) + tcfg.SiteID = id + tcfg.Region = region + // Build info + tcfg.BuildVersion = newtVersion + tcfg.BuildCommit = os.Getenv("NEWT_COMMIT") + + tel, telErr := telemetry.Init(ctx, tcfg) + if telErr != nil { + logger.Warn("Telemetry init failed: %v", telErr) + } + if tel != nil { + // Admin HTTP server (exposes /metrics when Prometheus exporter is enabled) + mux := http.NewServeMux() + mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) }) + if tel.PrometheusHandler != nil { + mux.Handle("/metrics", tel.PrometheusHandler) + } + admin := &http.Server{ + Addr: tcfg.AdminAddr, + Handler: otelhttp.NewHandler(mux, "newt-admin"), + ReadTimeout: 5 * time.Second, + WriteTimeout: 10 * time.Second, + ReadHeaderTimeout: 5 * time.Second, + IdleTimeout: 30 * time.Second, + } + go func() { + if err := admin.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Warn("admin http error: %v", err) + } + }() + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = admin.Shutdown(ctx) + }() + defer func() { _ = tel.Shutdown(context.Background()) }() + } + newtVersion := "version_replaceme" if *version { fmt.Println("Newt version " + newtVersion) @@ -557,7 +653,10 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub } // Use reliable ping for initial connection test logger.Debug("Testing initial connection with reliable ping...") - _, err = reliablePing(tnet, wgData.ServerIP, pingTimeout, 5) + lat, err := reliablePing(tnet, wgData.ServerIP, pingTimeout, 5) + if err == nil && wgData.PublicKey != "" { + telemetry.ObserveTunnelLatency(context.Background(), "", wgData.PublicKey, "wireguard", lat.Seconds()) + } if err != nil { logger.Warn("Initial reliable ping failed, but continuing: %v", err) } else { @@ -570,14 +669,20 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub // as the pings will continue in the background if !connected { logger.Debug("Starting ping check") - pingStopChan = startPingCheck(tnet, wgData.ServerIP, client) + pingStopChan = startPingCheck(tnet, wgData.ServerIP, client, wgData.PublicKey) } // Create proxy manager pm = proxy.NewProxyManager(tnet) + pm.SetAsyncBytes(metricsAsyncBytes) + // Set tunnel_id for metrics (WireGuard peer public key) + pm.SetTunnelID(wgData.PublicKey) connected = true + // telemetry: record a successful site registration (omit region unless available) + telemetry.IncSiteRegistration(context.Background(), id, "", "success") + // add the targets if there are any if len(wgData.Targets.TCP) > 0 { updateTargets(pm, "add", wgData.TunnelIP, "tcp", TargetData{Targets: wgData.Targets.TCP}) @@ -611,10 +716,25 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub client.RegisterHandler("newt/wg/reconnect", func(msg websocket.WSMessage) { logger.Info("Received reconnect message") + if wgData.PublicKey != "" { + telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request") + } // Close the WireGuard device and TUN closeWgTunnel() + // Clear metrics attrs and sessions for the tunnel + if pm != nil { + pm.ClearTunnelID() + state.Global().ClearTunnel(wgData.PublicKey) + } + + // Clear metrics attrs and sessions for the tunnel + if pm != nil { + pm.ClearTunnelID() + state.Global().ClearTunnel(wgData.PublicKey) + } + // Mark as disconnected connected = false @@ -631,6 +751,9 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub client.RegisterHandler("newt/wg/terminate", func(msg websocket.WSMessage) { logger.Info("Received termination message") + if wgData.PublicKey != "" { + telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request") + } // Close the WireGuard device and TUN closeWgTunnel() diff --git a/proxy/manager.go b/proxy/manager.go index bf10322..86c47a8 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -1,16 +1,22 @@ package proxy import ( + "context" "fmt" "io" "net" + "os" "strings" "sync" + "sync/atomic" "time" + "github.com/fosrl/newt/internal/state" + "github.com/fosrl/newt/internal/telemetry" "github.com/fosrl/newt/logger" "golang.zx2c4.com/wireguard/tun/netstack" "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet" + "go.opentelemetry.io/otel/attribute" ) // Target represents a proxy target with its address and port @@ -28,6 +34,52 @@ type ProxyManager struct { udpConns []*gonet.UDPConn running bool mutex sync.RWMutex + + // telemetry (multi-tunnel) + currentTunnelID string + tunnels map[string]*tunnelEntry + asyncBytes bool + flushStop chan struct{} +} + +// tunnelEntry holds per-tunnel attributes and (optional) async counters. +type tunnelEntry struct { + attrInTCP attribute.Set + attrOutTCP attribute.Set + attrInUDP attribute.Set + attrOutUDP attribute.Set + + bytesInTCP atomic.Uint64 + bytesOutTCP atomic.Uint64 + bytesInUDP atomic.Uint64 + bytesOutUDP atomic.Uint64 +} + +// countingWriter wraps an io.Writer and adds bytes to OTel counter using a pre-built attribute set. +type countingWriter struct { + ctx context.Context + w io.Writer + set attribute.Set + pm *ProxyManager + ent *tunnelEntry + out bool // false=in, true=out + proto string // "tcp" or "udp" +} + +func (cw *countingWriter) Write(p []byte) (int, error) { + n, err := cw.w.Write(p) + if n > 0 { + if cw.pm != nil && cw.pm.asyncBytes && cw.ent != nil { + if cw.proto == "tcp" { + if cw.out { cw.ent.bytesOutTCP.Add(uint64(n)) } else { cw.ent.bytesInTCP.Add(uint64(n)) } + } else if cw.proto == "udp" { + if cw.out { cw.ent.bytesOutUDP.Add(uint64(n)) } else { cw.ent.bytesInUDP.Add(uint64(n)) } + } + } else { + telemetry.AddTunnelBytesSet(cw.ctx, int64(n), cw.set) + } + } + return n, err } // NewProxyManager creates a new proxy manager instance @@ -38,9 +90,46 @@ func NewProxyManager(tnet *netstack.Net) *ProxyManager { udpTargets: make(map[string]map[int]string), listeners: make([]*gonet.TCPListener, 0), udpConns: make([]*gonet.UDPConn, 0), + tunnels: make(map[string]*tunnelEntry), } } +// SetTunnelID sets the WireGuard peer public key used as tunnel_id label. +func (pm *ProxyManager) SetTunnelID(id string) { + pm.mutex.Lock() + defer pm.mutex.Unlock() + pm.currentTunnelID = id + if _, ok := pm.tunnels[id]; !ok { + pm.tunnels[id] = &tunnelEntry{} + } + e := pm.tunnels[id] + e.attrInTCP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "in"), attribute.String("protocol", "tcp")) + e.attrOutTCP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "out"), attribute.String("protocol", "tcp")) + e.attrInUDP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "in"), attribute.String("protocol", "udp")) + e.attrOutUDP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "out"), attribute.String("protocol", "udp")) +} + +// ClearTunnelID clears cached attribute sets for the current tunnel. +func (pm *ProxyManager) ClearTunnelID() { + pm.mutex.Lock() + defer pm.mutex.Unlock() + id := pm.currentTunnelID + if id == "" { return } + if e, ok := pm.tunnels[id]; ok { + // final flush for this tunnel + inTCP := e.bytesInTCP.Swap(0) + outTCP := e.bytesOutTCP.Swap(0) + inUDP := e.bytesInUDP.Swap(0) + outUDP := e.bytesOutUDP.Swap(0) + if inTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) } + if outTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) } + if inUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) } + if outUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) } + delete(pm.tunnels, id) + } + pm.currentTunnelID = "" +} + // init function without tnet func NewProxyManagerWithoutTNet() *ProxyManager { return &ProxyManager{ @@ -160,6 +249,57 @@ func (pm *ProxyManager) Start() error { return nil } +func (pm *ProxyManager) SetAsyncBytes(b bool) { + pm.mutex.Lock() + defer pm.mutex.Unlock() + pm.asyncBytes = b + if b && pm.flushStop == nil { + pm.flushStop = make(chan struct{}) + go pm.flushLoop() + } +} +func (pm *ProxyManager) flushLoop() { + flushInterval := 2 * time.Second + if v := os.Getenv("OTEL_METRIC_EXPORT_INTERVAL"); v != "" { + if d, err := time.ParseDuration(v); err == nil && d > 0 { + if d/2 < flushInterval { flushInterval = d / 2 } + } + } + ticker := time.NewTicker(flushInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + pm.mutex.RLock() + for _, e := range pm.tunnels { + inTCP := e.bytesInTCP.Swap(0) + outTCP := e.bytesOutTCP.Swap(0) + inUDP := e.bytesInUDP.Swap(0) + outUDP := e.bytesOutUDP.Swap(0) + if inTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) } + if outTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) } + if inUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) } + if outUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) } + } + pm.mutex.RUnlock() + case <-pm.flushStop: + pm.mutex.RLock() + for _, e := range pm.tunnels { + inTCP := e.bytesInTCP.Swap(0) + outTCP := e.bytesOutTCP.Swap(0) + inUDP := e.bytesInUDP.Swap(0) + outUDP := e.bytesOutUDP.Swap(0) + if inTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) } + if outTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) } + if inUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) } + if outUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) } + } + pm.mutex.RUnlock() + return + } + } +} + func (pm *ProxyManager) Stop() error { pm.mutex.Lock() defer pm.mutex.Unlock() @@ -236,6 +376,14 @@ func (pm *ProxyManager) startTarget(proto, listenIP string, port int, targetAddr return nil } +// getEntry returns per-tunnel entry or nil. +func (pm *ProxyManager) getEntry(id string) *tunnelEntry { + pm.mutex.RLock() + e := pm.tunnels[id] + pm.mutex.RUnlock() + return e +} + func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) { for { conn, err := listener.Accept() @@ -257,6 +405,9 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) continue } +// Count sessions only once per accepted TCP connection + if pm.tunnelID != "" { state.Global().IncSessions(pm.tunnelID) } + go func() { target, err := net.Dial("tcp", targetAddr) if err != nil { @@ -265,24 +416,33 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) return } + // already incremented on accept + // Create a WaitGroup to ensure both copy operations complete var wg sync.WaitGroup wg.Add(2) + // client -> target (direction=in) go func() { defer wg.Done() - io.Copy(target, conn) - target.Close() +e := pm.getEntry(pm.currentTunnelID) +cw := &countingWriter{ctx: context.Background(), w: target, set: e.attrInTCP, pm: pm, ent: e, out: false, proto: "tcp"} + _, _ = io.Copy(cw, conn) + _ = target.Close() }() + // target -> client (direction=out) go func() { defer wg.Done() - io.Copy(conn, target) - conn.Close() +e := pm.getEntry(pm.currentTunnelID) +cw := &countingWriter{ctx: context.Background(), w: conn, set: e.attrOutTCP, pm: pm, ent: e, out: true, proto: "tcp"} + _, _ = io.Copy(cw, target) + _ = conn.Close() }() - // Wait for both copies to complete + // Wait for both copies to complete then session -1 wg.Wait() + if pm.tunnelID != "" { state.Global().DecSessions(pm.tunnelID) } }() } } @@ -326,6 +486,14 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { } clientKey := remoteAddr.String() + // bytes from client -> target (direction=in) +if pm.currentTunnelID != "" && n > 0 { +if pm.asyncBytes { + if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesInUDP.Add(uint64(n)) } + } else { + if e := pm.getEntry(pm.currentTunnelID); e != nil { telemetry.AddTunnelBytesSet(context.Background(), int64(n), e.attrInUDP) } + } + } clientsMutex.RLock() targetConn, exists := clientConns[clientKey] clientsMutex.RUnlock() @@ -366,6 +534,15 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { return // defer will handle cleanup } + // bytes from target -> client (direction=out) + if pm.currentTunnelID != "" && n > 0 { + if pm.asyncBytes { + if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesOutUDP.Add(uint64(n)) } + } else { +if e := pm.getEntry(pm.currentTunnelID); e != nil { telemetry.AddTunnelBytesSet(context.Background(), int64(n), e.attrOutUDP) } + } + } + _, err = conn.WriteTo(buffer[:n], remoteAddr) if err != nil { logger.Error("Error writing to client: %v", err) @@ -375,13 +552,19 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { }(clientKey, targetConn, remoteAddr) } - _, err = targetConn.Write(buffer[:n]) + written, err := targetConn.Write(buffer[:n]) if err != nil { logger.Error("Error writing to target: %v", err) targetConn.Close() clientsMutex.Lock() delete(clientConns, clientKey) clientsMutex.Unlock() +} else if pm.currentTunnelID != "" && written > 0 { + if pm.asyncBytes { + if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesInUDP.Add(uint64(written)) } + } else { +if e := pm.getEntry(pm.currentTunnelID); e != nil { telemetry.AddTunnelBytesSet(context.Background(), int64(written), e.attrInUDP) } + } } } } diff --git a/util.go b/util.go index 7d6da4f..c1f4915 100644 --- a/util.go +++ b/util.go @@ -17,6 +17,7 @@ import ( "github.com/fosrl/newt/logger" "github.com/fosrl/newt/proxy" "github.com/fosrl/newt/websocket" + "github.com/fosrl/newt/internal/telemetry" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" "golang.zx2c4.com/wireguard/device" @@ -229,7 +230,7 @@ func pingWithRetry(tnet *netstack.Net, dst string, timeout time.Duration) (stopC return stopChan, fmt.Errorf("initial ping attempts failed, continuing in background") } -func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client) chan struct{} { +func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client, tunnelID string) chan struct{} { maxInterval := 6 * time.Second currentInterval := pingInterval consecutiveFailures := 0 @@ -292,6 +293,9 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien if !connectionLost { connectionLost = true logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures) + if tunnelID != "" { + telemetry.IncReconnect(context.Background(), "", tunnelID, telemetry.ReasonTimeout) + } stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second) // Send registration message to the server for backward compatibility err := client.SendMessage("newt/wg/register", map[string]interface{}{ @@ -318,6 +322,10 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien } else { // Track recent latencies recentLatencies = append(recentLatencies, latency) + // Record tunnel latency (limit sampling to this periodic check) + if tunnelID != "" { + telemetry.ObserveTunnelLatency(context.Background(), "", tunnelID, "wireguard", latency.Seconds()) + } if len(recentLatencies) > 10 { recentLatencies = recentLatencies[1:] } diff --git a/websocket/client.go b/websocket/client.go index 0c0664a..c9ac264 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -18,6 +18,10 @@ import ( "github.com/fosrl/newt/logger" "github.com/gorilla/websocket" + + "context" + "github.com/fosrl/newt/internal/telemetry" + "go.opentelemetry.io/otel" ) type Client struct { @@ -287,6 +291,7 @@ func (c *Client) getToken() (string, error) { } resp, err := client.Do(req) if err != nil { + telemetry.IncConnError(context.Background(), c.config.ID, "auth", classifyConnError(err)) return "", fmt.Errorf("failed to request new token: %w", err) } defer resp.Body.Close() @@ -294,6 +299,18 @@ func (c *Client) getToken() (string, error) { if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) + telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "failure") + bin := "http_other" + if resp.StatusCode >= 500 { + bin = "http_5xx" + } else if resp.StatusCode >= 400 { + bin = "http_4xx" + } + telemetry.IncConnError(context.Background(), c.config.ID, "auth", bin) + // Reconnect reason mapping for auth failures + if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { + telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonAuthError) + } return "", fmt.Errorf("failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) } @@ -312,10 +329,33 @@ func (c *Client) getToken() (string, error) { } logger.Debug("Received token: %s", tokenResp.Data.Token) + telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "success") return tokenResp.Data.Token, nil } +// classifyConnError maps common errors to low-cardinality error_type labels +func classifyConnError(err error) string { + if err == nil { + return "" + } + msg := strings.ToLower(err.Error()) + switch { + case strings.Contains(msg, "tls") || strings.Contains(msg, "certificate"): + return "tls" + case strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout"): + return "timeout" + case strings.Contains(msg, "no such host") || strings.Contains(msg, "dns"): + return "dns" + case strings.Contains(msg, "unauthorized") || strings.Contains(msg, "forbidden"): + return "auth" + case strings.Contains(msg, "broken pipe") || strings.Contains(msg, "connection reset") || strings.Contains(msg, "connection refused") || strings.Contains(msg, "use of closed network connection") || strings.Contains(msg, "network is unreachable"): + return "io" + default: + return "other" + } +} + func (c *Client) connectWithRetry() { for { select { @@ -337,6 +377,10 @@ func (c *Client) establishConnection() error { // Get token for authentication token, err := c.getToken() if err != nil { + // telemetry: connection attempt failed before dialing + // site_id isn't globally available here; use client ID as site_id (low cardinality) + telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure") + telemetry.IncConnError(context.Background(), c.config.ID, "websocket", classifyConnError(err)) return fmt.Errorf("failed to get token: %w", err) } @@ -369,7 +413,11 @@ func (c *Client) establishConnection() error { q.Set("clientType", c.clientType) u.RawQuery = q.Encode() - // Connect to WebSocket + // Connect to WebSocket (optional span) + tr := otel.Tracer("newt") + spanCtx, span := tr.Start(context.Background(), "ws.connect") + defer span.End() + dialer := websocket.DefaultDialer // Use new TLS configuration method @@ -391,11 +439,23 @@ func (c *Client) establishConnection() error { logger.Debug("WebSocket TLS certificate verification disabled via SKIP_TLS_VERIFY environment variable") } - conn, _, err := dialer.Dial(u.String(), nil) +conn, _, err := dialer.DialContext(spanCtx, u.String(), nil) if err != nil { + telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure") + etype := classifyConnError(err) + telemetry.IncConnError(context.Background(), c.config.ID, "websocket", etype) + // Map handshake-related errors to reconnect reasons where appropriate + if etype == "tls" { + telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonHandshakeError) + } else if etype == "timeout" { + telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonTimeout) + } else { + telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonError) + } return fmt.Errorf("failed to connect to WebSocket: %w", err) } + telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "success") c.conn = conn c.setConnected(true) diff --git a/wg/wg.go b/wg/wg.go index 3cee1a9..a765279 100644 --- a/wg/wg.go +++ b/wg/wg.go @@ -3,6 +3,7 @@ package wg import ( + "context" "encoding/json" "errors" "fmt" @@ -23,6 +24,8 @@ import ( "golang.zx2c4.com/wireguard/conn" "golang.zx2c4.com/wireguard/wgctrl" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + + "github.com/fosrl/newt/internal/telemetry" ) type WgConfig struct { @@ -298,6 +301,13 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { s.stopGetConfig = nil } + // telemetry: config reload success + telemetry.IncConfigReload(context.Background(), "success") + // Optional reconnect reason mapping: config change + if s.serverPubKey != "" { + telemetry.IncReconnect(context.Background(), "", s.serverPubKey, telemetry.ReasonConfigChange) + } + // Ensure the WireGuard interface and peers are configured if err := s.ensureWireguardInterface(config); err != nil { logger.Error("Failed to ensure WireGuard interface: %v", err) diff --git a/wgnetstack/wgnetstack.go b/wgnetstack/wgnetstack.go index 6684c40..09f160e 100644 --- a/wgnetstack/wgnetstack.go +++ b/wgnetstack/wgnetstack.go @@ -1,6 +1,7 @@ package wgnetstack import ( + "context" "crypto/rand" "encoding/base64" "encoding/hex" @@ -26,6 +27,8 @@ import ( "golang.zx2c4.com/wireguard/tun" "golang.zx2c4.com/wireguard/tun/netstack" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + + "github.com/fosrl/newt/internal/telemetry" ) type WgConfig struct { @@ -240,14 +243,20 @@ func NewWireGuardService(interfaceName string, mtu int, generateAndSaveKeyTo str return service, nil } +// ReportRTT allows reporting native RTTs to telemetry, rate-limited externally. +func (s *WireGuardService) ReportRTT(seconds float64) { + if s.serverPubKey == "" { return } + telemetry.ObserveTunnelLatency(context.Background(), "", s.serverPubKey, "wireguard", seconds) +} + func (s *WireGuardService) addTcpTarget(msg websocket.WSMessage) { logger.Debug("Received: %+v", msg) // if there is no wgData or pm, we can't add targets if s.TunnelIP == "" || s.proxyManager == nil { logger.Info("No tunnel IP or proxy manager available") - return - } + return +} targetData, err := parseTargetData(msg.Data) if err != nil {