diff --git a/artifacts/test-results.txt b/artifacts/test-results.txt new file mode 100644 index 0000000..db66eb8 --- /dev/null +++ b/artifacts/test-results.txt @@ -0,0 +1,13 @@ +FAIL github.com/fosrl/newt [setup failed] +FAIL github.com/fosrl/newt/docker [setup failed] +FAIL github.com/fosrl/newt/internal/state [setup failed] +FAIL github.com/fosrl/newt/internal/telemetry [setup failed] +FAIL github.com/fosrl/newt/proxy [setup failed] +FAIL github.com/fosrl/newt/websocket [setup failed] +FAIL github.com/fosrl/newt/wgnetstack [setup failed] +? github.com/fosrl/newt/healthcheck [no test files] +? github.com/fosrl/newt/logger [no test files] +? github.com/fosrl/newt/network [no test files] +? github.com/fosrl/newt/updates [no test files] +FAIL github.com/fosrl/newt/wgtester [build failed] +FAIL diff --git a/internal/telemetry/testdata/expected_contains.golden b/internal/telemetry/testdata/expected_contains.golden new file mode 100644 index 0000000..48123dd --- /dev/null +++ b/internal/telemetry/testdata/expected_contains.golden @@ -0,0 +1,3 @@ +newt_connection_attempts_total +newt_build_info + diff --git a/patches/00_all_changes.patch b/patches/00_all_changes.patch new file mode 100644 index 0000000..ed7a234 --- /dev/null +++ b/patches/00_all_changes.patch @@ -0,0 +1,802 @@ +diff --git a/Dockerfile b/Dockerfile +index b9c4d29..b9b6dea 100644 +--- a/Dockerfile ++++ b/Dockerfile +@@ -22,6 +22,9 @@ RUN apk --no-cache add ca-certificates tzdata + COPY --from=builder /newt /usr/local/bin/ + COPY entrypoint.sh / + ++# Admin/metrics endpoint (Prometheus scrape) ++EXPOSE 2112 ++ + RUN chmod +x /entrypoint.sh + ENTRYPOINT ["/entrypoint.sh"] +-CMD ["newt"] +\ No newline at end of file ++CMD ["newt"] +diff --git a/go.mod b/go.mod +index d475835..5909955 100644 +--- a/go.mod ++++ b/go.mod +@@ -7,6 +7,14 @@ require ( + github.com/google/gopacket v1.1.19 + github.com/gorilla/websocket v1.5.3 + github.com/vishvananda/netlink v1.3.1 ++ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 ++ go.opentelemetry.io/contrib/instrumentation/runtime v0.62.0 ++ go.opentelemetry.io/otel v1.37.0 ++ go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.37.0 ++ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 ++ go.opentelemetry.io/otel/sdk/metric v1.37.0 ++ go.opentelemetry.io/otel/sdk/trace v1.37.0 ++ go.opentelemetry.io/otel/semconv v1.26.0 + golang.org/x/crypto v0.42.0 + golang.org/x/exp v0.0.0-20250718183923-645b1fa84792 + golang.org/x/net v0.44.0 +diff --git a/main.go b/main.go +index 12849b1..c223b75 100644 +--- a/main.go ++++ b/main.go +@@ -1,7 +1,9 @@ + package main + + import ( ++ "context" + "encoding/json" ++ "errors" + "flag" + "fmt" + "net" +@@ -22,6 +24,9 @@ import ( + "github.com/fosrl/newt/updates" + "github.com/fosrl/newt/websocket" + ++ "github.com/fosrl/newt/internal/state" ++ "github.com/fosrl/newt/internal/telemetry" ++ "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "golang.zx2c4.com/wireguard/conn" + "golang.zx2c4.com/wireguard/device" + "golang.zx2c4.com/wireguard/tun" +@@ -116,6 +121,13 @@ var ( + healthMonitor *healthcheck.Monitor + enforceHealthcheckCert bool + ++ // Observability/metrics flags ++ metricsEnabled bool ++ otlpEnabled bool ++ adminAddr string ++ region string ++ metricsAsyncBytes bool ++ + // New mTLS configuration variables + tlsClientCert string + tlsClientKey string +@@ -126,6 +138,10 @@ var ( + ) + + func main() { ++ // Prepare context for graceful shutdown and signal handling ++ ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) ++ defer stop() ++ + // if PANGOLIN_ENDPOINT, NEWT_ID, and NEWT_SECRET are set as environment variables, they will be used as default values + endpoint = os.Getenv("PANGOLIN_ENDPOINT") + id = os.Getenv("NEWT_ID") +@@ -141,6 +157,13 @@ func main() { + useNativeInterfaceEnv := os.Getenv("USE_NATIVE_INTERFACE") + enforceHealthcheckCertEnv := os.Getenv("ENFORCE_HC_CERT") + ++ // Metrics/observability env mirrors ++ metricsEnabledEnv := os.Getenv("NEWT_METRICS_PROMETHEUS_ENABLED") ++ otlpEnabledEnv := os.Getenv("NEWT_METRICS_OTLP_ENABLED") ++ adminAddrEnv := os.Getenv("NEWT_ADMIN_ADDR") ++ regionEnv := os.Getenv("NEWT_REGION") ++ asyncBytesEnv := os.Getenv("NEWT_METRICS_ASYNC_BYTES") ++ + keepInterface = keepInterfaceEnv == "true" + acceptClients = acceptClientsEnv == "true" + useNativeInterface = useNativeInterfaceEnv == "true" +@@ -272,6 +295,35 @@ func main() { + flag.StringVar(&healthFile, "health-file", "", "Path to health file (if unset, health file won't be written)") + } + ++ // Metrics/observability flags (mirror ENV if unset) ++ if metricsEnabledEnv == "" { ++ flag.BoolVar(&metricsEnabled, "metrics", true, "Enable Prometheus /metrics exporter") ++ } else { ++ if v, err := strconv.ParseBool(metricsEnabledEnv); err == nil { metricsEnabled = v } else { metricsEnabled = true } ++ } ++ if otlpEnabledEnv == "" { ++ flag.BoolVar(&otlpEnabled, "otlp", false, "Enable OTLP exporters (metrics/traces) to OTEL_EXPORTER_OTLP_ENDPOINT") ++ } else { ++ if v, err := strconv.ParseBool(otlpEnabledEnv); err == nil { otlpEnabled = v } ++ } ++ if adminAddrEnv == "" { ++ flag.StringVar(&adminAddr, "metrics-admin-addr", "127.0.0.1:2112", "Admin/metrics bind address") ++ } else { ++ adminAddr = adminAddrEnv ++ } ++ // Async bytes toggle ++ if asyncBytesEnv == "" { ++ flag.BoolVar(&metricsAsyncBytes, "metrics-async-bytes", false, "Enable async bytes counting (background flush; lower hot path overhead)") ++ } else { ++ if v, err := strconv.ParseBool(asyncBytesEnv); err == nil { metricsAsyncBytes = v } ++ } ++ // Optional region flag (resource attribute) ++ if regionEnv == "" { ++ flag.StringVar(®ion, "region", "", "Optional region resource attribute (also NEWT_REGION)") ++ } else { ++ region = regionEnv ++ } ++ + // do a --version check + version := flag.Bool("version", false, "Print the version") + +@@ -286,6 +338,50 @@ func main() { + loggerLevel := parseLogLevel(logLevel) + logger.GetLogger().SetLevel(parseLogLevel(logLevel)) + ++ // Initialize telemetry after flags are parsed (so flags override env) ++ tcfg := telemetry.FromEnv() ++ tcfg.PromEnabled = metricsEnabled ++ tcfg.OTLPEnabled = otlpEnabled ++ if adminAddr != "" { tcfg.AdminAddr = adminAddr } ++ // Resource attributes (if available) ++ tcfg.SiteID = id ++ tcfg.Region = region ++ // Build info ++ tcfg.BuildVersion = newtVersion ++ tcfg.BuildCommit = os.Getenv("NEWT_COMMIT") ++ ++ tel, telErr := telemetry.Init(ctx, tcfg) ++ if telErr != nil { ++ logger.Warn("Telemetry init failed: %v", telErr) ++ } ++ if tel != nil { ++ // Admin HTTP server (exposes /metrics when Prometheus exporter is enabled) ++ mux := http.NewServeMux() ++ mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) }) ++ if tel.PrometheusHandler != nil { ++ mux.Handle("/metrics", tel.PrometheusHandler) ++ } ++ admin := &http.Server{ ++ Addr: tcfg.AdminAddr, ++ Handler: otelhttp.NewHandler(mux, "newt-admin"), ++ ReadTimeout: 5 * time.Second, ++ WriteTimeout: 10 * time.Second, ++ ReadHeaderTimeout: 5 * time.Second, ++ IdleTimeout: 30 * time.Second, ++ } ++ go func() { ++ if err := admin.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { ++ logger.Warn("admin http error: %v", err) ++ } ++ }() ++ defer func() { ++ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ++ defer cancel() ++ _ = admin.Shutdown(ctx) ++ }() ++ defer func() { _ = tel.Shutdown(context.Background()) }() ++ } ++ + newtVersion := "version_replaceme" + if *version { + fmt.Println("Newt version " + newtVersion) +@@ -557,7 +653,10 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub + } + // Use reliable ping for initial connection test + logger.Debug("Testing initial connection with reliable ping...") +- _, err = reliablePing(tnet, wgData.ServerIP, pingTimeout, 5) ++ lat, err := reliablePing(tnet, wgData.ServerIP, pingTimeout, 5) ++ if err == nil && wgData.PublicKey != "" { ++ telemetry.ObserveTunnelLatency(context.Background(), "", wgData.PublicKey, "wireguard", lat.Seconds()) ++ } + if err != nil { + logger.Warn("Initial reliable ping failed, but continuing: %v", err) + } else { +@@ -570,14 +669,20 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub + // as the pings will continue in the background + if !connected { + logger.Debug("Starting ping check") +- pingStopChan = startPingCheck(tnet, wgData.ServerIP, client) ++ pingStopChan = startPingCheck(tnet, wgData.ServerIP, client, wgData.PublicKey) + } + + // Create proxy manager + pm = proxy.NewProxyManager(tnet) ++ pm.SetAsyncBytes(metricsAsyncBytes) ++ // Set tunnel_id for metrics (WireGuard peer public key) ++ pm.SetTunnelID(wgData.PublicKey) + + connected = true + ++ // telemetry: record a successful site registration (omit region unless available) ++ telemetry.IncSiteRegistration(context.Background(), id, "", "success") ++ + // add the targets if there are any + if len(wgData.Targets.TCP) > 0 { + updateTargets(pm, "add", wgData.TunnelIP, "tcp", TargetData{Targets: wgData.Targets.TCP}) +@@ -611,10 +716,25 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub + + client.RegisterHandler("newt/wg/reconnect", func(msg websocket.WSMessage) { + logger.Info("Received reconnect message") ++ if wgData.PublicKey != "" { ++ telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request") ++ } + + // Close the WireGuard device and TUN + closeWgTunnel() + ++ // Clear metrics attrs and sessions for the tunnel ++ if pm != nil { ++ pm.ClearTunnelID() ++ state.Global().ClearTunnel(wgData.PublicKey) ++ } ++ ++ // Clear metrics attrs and sessions for the tunnel ++ if pm != nil { ++ pm.ClearTunnelID() ++ state.Global().ClearTunnel(wgData.PublicKey) ++ } ++ + // Mark as disconnected + connected = false + +@@ -631,6 +751,9 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub + + client.RegisterHandler("newt/wg/terminate", func(msg websocket.WSMessage) { + logger.Info("Received termination message") ++ if wgData.PublicKey != "" { ++ telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request") ++ } + + // Close the WireGuard device and TUN + closeWgTunnel() +diff --git a/proxy/manager.go b/proxy/manager.go +index bf10322..86c47a8 100644 +--- a/proxy/manager.go ++++ b/proxy/manager.go +@@ -1,16 +1,22 @@ + package proxy + + import ( ++ "context" + "fmt" + "io" + "net" ++ "os" + "strings" + "sync" ++ "sync/atomic" + "time" + ++ "github.com/fosrl/newt/internal/state" ++ "github.com/fosrl/newt/internal/telemetry" + "github.com/fosrl/newt/logger" + "golang.zx2c4.com/wireguard/tun/netstack" + "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet" ++ "go.opentelemetry.io/otel/attribute" + ) + + // Target represents a proxy target with its address and port +@@ -28,6 +34,52 @@ type ProxyManager struct { + udpConns []*gonet.UDPConn + running bool + mutex sync.RWMutex ++ ++ // telemetry (multi-tunnel) ++ currentTunnelID string ++ tunnels map[string]*tunnelEntry ++ asyncBytes bool ++ flushStop chan struct{} ++} ++ ++// tunnelEntry holds per-tunnel attributes and (optional) async counters. ++type tunnelEntry struct { ++ attrInTCP attribute.Set ++ attrOutTCP attribute.Set ++ attrInUDP attribute.Set ++ attrOutUDP attribute.Set ++ ++ bytesInTCP atomic.Uint64 ++ bytesOutTCP atomic.Uint64 ++ bytesInUDP atomic.Uint64 ++ bytesOutUDP atomic.Uint64 ++} ++ ++// countingWriter wraps an io.Writer and adds bytes to OTel counter using a pre-built attribute set. ++type countingWriter struct { ++ ctx context.Context ++ w io.Writer ++ set attribute.Set ++ pm *ProxyManager ++ ent *tunnelEntry ++ out bool // false=in, true=out ++ proto string // "tcp" or "udp" ++} ++ ++func (cw *countingWriter) Write(p []byte) (int, error) { ++ n, err := cw.w.Write(p) ++ if n > 0 { ++ if cw.pm != nil && cw.pm.asyncBytes && cw.ent != nil { ++ if cw.proto == "tcp" { ++ if cw.out { cw.ent.bytesOutTCP.Add(uint64(n)) } else { cw.ent.bytesInTCP.Add(uint64(n)) } ++ } else if cw.proto == "udp" { ++ if cw.out { cw.ent.bytesOutUDP.Add(uint64(n)) } else { cw.ent.bytesInUDP.Add(uint64(n)) } ++ } ++ } else { ++ telemetry.AddTunnelBytesSet(cw.ctx, int64(n), cw.set) ++ } ++ } ++ return n, err + } + + // NewProxyManager creates a new proxy manager instance +@@ -38,9 +90,46 @@ func NewProxyManager(tnet *netstack.Net) *ProxyManager { + udpTargets: make(map[string]map[int]string), + listeners: make([]*gonet.TCPListener, 0), + udpConns: make([]*gonet.UDPConn, 0), ++ tunnels: make(map[string]*tunnelEntry), + } + } + ++// SetTunnelID sets the WireGuard peer public key used as tunnel_id label. ++func (pm *ProxyManager) SetTunnelID(id string) { ++ pm.mutex.Lock() ++ defer pm.mutex.Unlock() ++ pm.currentTunnelID = id ++ if _, ok := pm.tunnels[id]; !ok { ++ pm.tunnels[id] = &tunnelEntry{} ++ } ++ e := pm.tunnels[id] ++ e.attrInTCP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "in"), attribute.String("protocol", "tcp")) ++ e.attrOutTCP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "out"), attribute.String("protocol", "tcp")) ++ e.attrInUDP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "in"), attribute.String("protocol", "udp")) ++ e.attrOutUDP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "out"), attribute.String("protocol", "udp")) ++} ++ ++// ClearTunnelID clears cached attribute sets for the current tunnel. ++func (pm *ProxyManager) ClearTunnelID() { ++ pm.mutex.Lock() ++ defer pm.mutex.Unlock() ++ id := pm.currentTunnelID ++ if id == "" { return } ++ if e, ok := pm.tunnels[id]; ok { ++ // final flush for this tunnel ++ inTCP := e.bytesInTCP.Swap(0) ++ outTCP := e.bytesOutTCP.Swap(0) ++ inUDP := e.bytesInUDP.Swap(0) ++ outUDP := e.bytesOutUDP.Swap(0) ++ if inTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) } ++ if outTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) } ++ if inUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) } ++ if outUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) } ++ delete(pm.tunnels, id) ++ } ++ pm.currentTunnelID = "" ++} ++ + // init function without tnet + func NewProxyManagerWithoutTNet() *ProxyManager { + return &ProxyManager{ +@@ -160,6 +249,57 @@ func (pm *ProxyManager) Start() error { + return nil + } + ++func (pm *ProxyManager) SetAsyncBytes(b bool) { ++ pm.mutex.Lock() ++ defer pm.mutex.Unlock() ++ pm.asyncBytes = b ++ if b && pm.flushStop == nil { ++ pm.flushStop = make(chan struct{}) ++ go pm.flushLoop() ++ } ++} ++func (pm *ProxyManager) flushLoop() { ++ flushInterval := 2 * time.Second ++ if v := os.Getenv("OTEL_METRIC_EXPORT_INTERVAL"); v != "" { ++ if d, err := time.ParseDuration(v); err == nil && d > 0 { ++ if d/2 < flushInterval { flushInterval = d / 2 } ++ } ++ } ++ ticker := time.NewTicker(flushInterval) ++ defer ticker.Stop() ++ for { ++ select { ++ case <-ticker.C: ++ pm.mutex.RLock() ++ for _, e := range pm.tunnels { ++ inTCP := e.bytesInTCP.Swap(0) ++ outTCP := e.bytesOutTCP.Swap(0) ++ inUDP := e.bytesInUDP.Swap(0) ++ outUDP := e.bytesOutUDP.Swap(0) ++ if inTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) } ++ if outTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) } ++ if inUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) } ++ if outUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) } ++ } ++ pm.mutex.RUnlock() ++ case <-pm.flushStop: ++ pm.mutex.RLock() ++ for _, e := range pm.tunnels { ++ inTCP := e.bytesInTCP.Swap(0) ++ outTCP := e.bytesOutTCP.Swap(0) ++ inUDP := e.bytesInUDP.Swap(0) ++ outUDP := e.bytesOutUDP.Swap(0) ++ if inTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) } ++ if outTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) } ++ if inUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) } ++ if outUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) } ++ } ++ pm.mutex.RUnlock() ++ return ++ } ++ } ++} ++ + func (pm *ProxyManager) Stop() error { + pm.mutex.Lock() + defer pm.mutex.Unlock() +@@ -236,6 +376,14 @@ func (pm *ProxyManager) startTarget(proto, listenIP string, port int, targetAddr + return nil + } + ++// getEntry returns per-tunnel entry or nil. ++func (pm *ProxyManager) getEntry(id string) *tunnelEntry { ++ pm.mutex.RLock() ++ e := pm.tunnels[id] ++ pm.mutex.RUnlock() ++ return e ++} ++ + func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) { + for { + conn, err := listener.Accept() +@@ -257,6 +405,9 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) + continue + } + ++// Count sessions only once per accepted TCP connection ++ if pm.tunnelID != "" { state.Global().IncSessions(pm.tunnelID) } ++ + go func() { + target, err := net.Dial("tcp", targetAddr) + if err != nil { +@@ -265,24 +416,33 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) + return + } + ++ // already incremented on accept ++ + // Create a WaitGroup to ensure both copy operations complete + var wg sync.WaitGroup + wg.Add(2) + ++ // client -> target (direction=in) + go func() { + defer wg.Done() +- io.Copy(target, conn) +- target.Close() ++e := pm.getEntry(pm.currentTunnelID) ++cw := &countingWriter{ctx: context.Background(), w: target, set: e.attrInTCP, pm: pm, ent: e, out: false, proto: "tcp"} ++ _, _ = io.Copy(cw, conn) ++ _ = target.Close() + }() + ++ // target -> client (direction=out) + go func() { + defer wg.Done() +- io.Copy(conn, target) +- conn.Close() ++e := pm.getEntry(pm.currentTunnelID) ++cw := &countingWriter{ctx: context.Background(), w: conn, set: e.attrOutTCP, pm: pm, ent: e, out: true, proto: "tcp"} ++ _, _ = io.Copy(cw, target) ++ _ = conn.Close() + }() + +- // Wait for both copies to complete ++ // Wait for both copies to complete then session -1 + wg.Wait() ++ if pm.tunnelID != "" { state.Global().DecSessions(pm.tunnelID) } + }() + } + } +@@ -326,6 +486,14 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { + } + + clientKey := remoteAddr.String() ++ // bytes from client -> target (direction=in) ++if pm.currentTunnelID != "" && n > 0 { ++if pm.asyncBytes { ++ if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesInUDP.Add(uint64(n)) } ++ } else { ++ if e := pm.getEntry(pm.currentTunnelID); e != nil { telemetry.AddTunnelBytesSet(context.Background(), int64(n), e.attrInUDP) } ++ } ++ } + clientsMutex.RLock() + targetConn, exists := clientConns[clientKey] + clientsMutex.RUnlock() +@@ -366,6 +534,15 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { + return // defer will handle cleanup + } + ++ // bytes from target -> client (direction=out) ++ if pm.currentTunnelID != "" && n > 0 { ++ if pm.asyncBytes { ++ if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesOutUDP.Add(uint64(n)) } ++ } else { ++if e := pm.getEntry(pm.currentTunnelID); e != nil { telemetry.AddTunnelBytesSet(context.Background(), int64(n), e.attrOutUDP) } ++ } ++ } ++ + _, err = conn.WriteTo(buffer[:n], remoteAddr) + if err != nil { + logger.Error("Error writing to client: %v", err) +@@ -375,13 +552,19 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { + }(clientKey, targetConn, remoteAddr) + } + +- _, err = targetConn.Write(buffer[:n]) ++ written, err := targetConn.Write(buffer[:n]) + if err != nil { + logger.Error("Error writing to target: %v", err) + targetConn.Close() + clientsMutex.Lock() + delete(clientConns, clientKey) + clientsMutex.Unlock() ++} else if pm.currentTunnelID != "" && written > 0 { ++ if pm.asyncBytes { ++ if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesInUDP.Add(uint64(written)) } ++ } else { ++if e := pm.getEntry(pm.currentTunnelID); e != nil { telemetry.AddTunnelBytesSet(context.Background(), int64(written), e.attrInUDP) } ++ } + } + } + } +diff --git a/util.go b/util.go +index 7d6da4f..c1f4915 100644 +--- a/util.go ++++ b/util.go +@@ -17,6 +17,7 @@ import ( + "github.com/fosrl/newt/logger" + "github.com/fosrl/newt/proxy" + "github.com/fosrl/newt/websocket" ++ "github.com/fosrl/newt/internal/telemetry" + "golang.org/x/net/icmp" + "golang.org/x/net/ipv4" + "golang.zx2c4.com/wireguard/device" +@@ -229,7 +230,7 @@ func pingWithRetry(tnet *netstack.Net, dst string, timeout time.Duration) (stopC + return stopChan, fmt.Errorf("initial ping attempts failed, continuing in background") + } + +-func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client) chan struct{} { ++func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client, tunnelID string) chan struct{} { + maxInterval := 6 * time.Second + currentInterval := pingInterval + consecutiveFailures := 0 +@@ -292,6 +293,9 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien + if !connectionLost { + connectionLost = true + logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures) ++ if tunnelID != "" { ++ telemetry.IncReconnect(context.Background(), "", tunnelID, telemetry.ReasonTimeout) ++ } + stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second) + // Send registration message to the server for backward compatibility + err := client.SendMessage("newt/wg/register", map[string]interface{}{ +@@ -318,6 +322,10 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien + } else { + // Track recent latencies + recentLatencies = append(recentLatencies, latency) ++ // Record tunnel latency (limit sampling to this periodic check) ++ if tunnelID != "" { ++ telemetry.ObserveTunnelLatency(context.Background(), "", tunnelID, "wireguard", latency.Seconds()) ++ } + if len(recentLatencies) > 10 { + recentLatencies = recentLatencies[1:] + } +diff --git a/websocket/client.go b/websocket/client.go +index 0c0664a..c9ac264 100644 +--- a/websocket/client.go ++++ b/websocket/client.go +@@ -18,6 +18,10 @@ import ( + + "github.com/fosrl/newt/logger" + "github.com/gorilla/websocket" ++ ++ "context" ++ "github.com/fosrl/newt/internal/telemetry" ++ "go.opentelemetry.io/otel" + ) + + type Client struct { +@@ -287,6 +291,7 @@ func (c *Client) getToken() (string, error) { + } + resp, err := client.Do(req) + if err != nil { ++ telemetry.IncConnError(context.Background(), c.config.ID, "auth", classifyConnError(err)) + return "", fmt.Errorf("failed to request new token: %w", err) + } + defer resp.Body.Close() +@@ -294,6 +299,18 @@ func (c *Client) getToken() (string, error) { + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "failure") ++ bin := "http_other" ++ if resp.StatusCode >= 500 { ++ bin = "http_5xx" ++ } else if resp.StatusCode >= 400 { ++ bin = "http_4xx" ++ } ++ telemetry.IncConnError(context.Background(), c.config.ID, "auth", bin) ++ // Reconnect reason mapping for auth failures ++ if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { ++ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonAuthError) ++ } + return "", fmt.Errorf("failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) + } + +@@ -312,10 +329,33 @@ func (c *Client) getToken() (string, error) { + } + + logger.Debug("Received token: %s", tokenResp.Data.Token) ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "success") + + return tokenResp.Data.Token, nil + } + ++// classifyConnError maps common errors to low-cardinality error_type labels ++func classifyConnError(err error) string { ++ if err == nil { ++ return "" ++ } ++ msg := strings.ToLower(err.Error()) ++ switch { ++ case strings.Contains(msg, "tls") || strings.Contains(msg, "certificate"): ++ return "tls" ++ case strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout"): ++ return "timeout" ++ case strings.Contains(msg, "no such host") || strings.Contains(msg, "dns"): ++ return "dns" ++ case strings.Contains(msg, "unauthorized") || strings.Contains(msg, "forbidden"): ++ return "auth" ++ case strings.Contains(msg, "broken pipe") || strings.Contains(msg, "connection reset") || strings.Contains(msg, "connection refused") || strings.Contains(msg, "use of closed network connection") || strings.Contains(msg, "network is unreachable"): ++ return "io" ++ default: ++ return "other" ++ } ++} ++ + func (c *Client) connectWithRetry() { + for { + select { +@@ -337,6 +377,10 @@ func (c *Client) establishConnection() error { + // Get token for authentication + token, err := c.getToken() + if err != nil { ++ // telemetry: connection attempt failed before dialing ++ // site_id isn't globally available here; use client ID as site_id (low cardinality) ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure") ++ telemetry.IncConnError(context.Background(), c.config.ID, "websocket", classifyConnError(err)) + return fmt.Errorf("failed to get token: %w", err) + } + +@@ -369,7 +413,11 @@ func (c *Client) establishConnection() error { + q.Set("clientType", c.clientType) + u.RawQuery = q.Encode() + +- // Connect to WebSocket ++ // Connect to WebSocket (optional span) ++ tr := otel.Tracer("newt") ++ spanCtx, span := tr.Start(context.Background(), "ws.connect") ++ defer span.End() ++ + dialer := websocket.DefaultDialer + + // Use new TLS configuration method +@@ -391,11 +439,23 @@ func (c *Client) establishConnection() error { + logger.Debug("WebSocket TLS certificate verification disabled via SKIP_TLS_VERIFY environment variable") + } + +- conn, _, err := dialer.Dial(u.String(), nil) ++conn, _, err := dialer.DialContext(spanCtx, u.String(), nil) + if err != nil { ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure") ++ etype := classifyConnError(err) ++ telemetry.IncConnError(context.Background(), c.config.ID, "websocket", etype) ++ // Map handshake-related errors to reconnect reasons where appropriate ++ if etype == "tls" { ++ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonHandshakeError) ++ } else if etype == "timeout" { ++ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonTimeout) ++ } else { ++ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonError) ++ } + return fmt.Errorf("failed to connect to WebSocket: %w", err) + } + ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "success") + c.conn = conn + c.setConnected(true) + +diff --git a/wg/wg.go b/wg/wg.go +index 3cee1a9..a765279 100644 +--- a/wg/wg.go ++++ b/wg/wg.go +@@ -3,6 +3,7 @@ + package wg + + import ( ++ "context" + "encoding/json" + "errors" + "fmt" +@@ -23,6 +24,8 @@ import ( + "golang.zx2c4.com/wireguard/conn" + "golang.zx2c4.com/wireguard/wgctrl" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ++ ++ "github.com/fosrl/newt/internal/telemetry" + ) + + type WgConfig struct { +@@ -298,6 +301,13 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { + s.stopGetConfig = nil + } + ++ // telemetry: config reload success ++ telemetry.IncConfigReload(context.Background(), "success") ++ // Optional reconnect reason mapping: config change ++ if s.serverPubKey != "" { ++ telemetry.IncReconnect(context.Background(), "", s.serverPubKey, telemetry.ReasonConfigChange) ++ } ++ + // Ensure the WireGuard interface and peers are configured + if err := s.ensureWireguardInterface(config); err != nil { + logger.Error("Failed to ensure WireGuard interface: %v", err) +diff --git a/wgnetstack/wgnetstack.go b/wgnetstack/wgnetstack.go +index 6684c40..09f160e 100644 +--- a/wgnetstack/wgnetstack.go ++++ b/wgnetstack/wgnetstack.go +@@ -1,6 +1,7 @@ + package wgnetstack + + import ( ++ "context" + "crypto/rand" + "encoding/base64" + "encoding/hex" +@@ -26,6 +27,8 @@ import ( + "golang.zx2c4.com/wireguard/tun" + "golang.zx2c4.com/wireguard/tun/netstack" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ++ ++ "github.com/fosrl/newt/internal/telemetry" + ) + + type WgConfig struct { +@@ -240,14 +243,20 @@ func NewWireGuardService(interfaceName string, mtu int, generateAndSaveKeyTo str + return service, nil + } + ++// ReportRTT allows reporting native RTTs to telemetry, rate-limited externally. ++func (s *WireGuardService) ReportRTT(seconds float64) { ++ if s.serverPubKey == "" { return } ++ telemetry.ObserveTunnelLatency(context.Background(), "", s.serverPubKey, "wireguard", seconds) ++} ++ + func (s *WireGuardService) addTcpTarget(msg websocket.WSMessage) { + logger.Debug("Received: %+v", msg) + + // if there is no wgData or pm, we can't add targets + if s.TunnelIP == "" || s.proxyManager == nil { + logger.Info("No tunnel IP or proxy manager available") +- return +- } ++ return ++} + + targetData, err := parseTargetData(msg.Data) + if err != nil { diff --git a/patches/01_proxy_multitunnel.patch b/patches/01_proxy_multitunnel.patch new file mode 100644 index 0000000..c4aafb6 --- /dev/null +++ b/patches/01_proxy_multitunnel.patch @@ -0,0 +1,301 @@ +diff --git a/proxy/manager.go b/proxy/manager.go +index bf10322..86c47a8 100644 +--- a/proxy/manager.go ++++ b/proxy/manager.go +@@ -1,16 +1,22 @@ + package proxy + + import ( ++ "context" + "fmt" + "io" + "net" ++ "os" + "strings" + "sync" ++ "sync/atomic" + "time" + ++ "github.com/fosrl/newt/internal/state" ++ "github.com/fosrl/newt/internal/telemetry" + "github.com/fosrl/newt/logger" + "golang.zx2c4.com/wireguard/tun/netstack" + "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet" ++ "go.opentelemetry.io/otel/attribute" + ) + + // Target represents a proxy target with its address and port +@@ -28,6 +34,52 @@ type ProxyManager struct { + udpConns []*gonet.UDPConn + running bool + mutex sync.RWMutex ++ ++ // telemetry (multi-tunnel) ++ currentTunnelID string ++ tunnels map[string]*tunnelEntry ++ asyncBytes bool ++ flushStop chan struct{} ++} ++ ++// tunnelEntry holds per-tunnel attributes and (optional) async counters. ++type tunnelEntry struct { ++ attrInTCP attribute.Set ++ attrOutTCP attribute.Set ++ attrInUDP attribute.Set ++ attrOutUDP attribute.Set ++ ++ bytesInTCP atomic.Uint64 ++ bytesOutTCP atomic.Uint64 ++ bytesInUDP atomic.Uint64 ++ bytesOutUDP atomic.Uint64 ++} ++ ++// countingWriter wraps an io.Writer and adds bytes to OTel counter using a pre-built attribute set. ++type countingWriter struct { ++ ctx context.Context ++ w io.Writer ++ set attribute.Set ++ pm *ProxyManager ++ ent *tunnelEntry ++ out bool // false=in, true=out ++ proto string // "tcp" or "udp" ++} ++ ++func (cw *countingWriter) Write(p []byte) (int, error) { ++ n, err := cw.w.Write(p) ++ if n > 0 { ++ if cw.pm != nil && cw.pm.asyncBytes && cw.ent != nil { ++ if cw.proto == "tcp" { ++ if cw.out { cw.ent.bytesOutTCP.Add(uint64(n)) } else { cw.ent.bytesInTCP.Add(uint64(n)) } ++ } else if cw.proto == "udp" { ++ if cw.out { cw.ent.bytesOutUDP.Add(uint64(n)) } else { cw.ent.bytesInUDP.Add(uint64(n)) } ++ } ++ } else { ++ telemetry.AddTunnelBytesSet(cw.ctx, int64(n), cw.set) ++ } ++ } ++ return n, err + } + + // NewProxyManager creates a new proxy manager instance +@@ -38,9 +90,46 @@ func NewProxyManager(tnet *netstack.Net) *ProxyManager { + udpTargets: make(map[string]map[int]string), + listeners: make([]*gonet.TCPListener, 0), + udpConns: make([]*gonet.UDPConn, 0), ++ tunnels: make(map[string]*tunnelEntry), + } + } + ++// SetTunnelID sets the WireGuard peer public key used as tunnel_id label. ++func (pm *ProxyManager) SetTunnelID(id string) { ++ pm.mutex.Lock() ++ defer pm.mutex.Unlock() ++ pm.currentTunnelID = id ++ if _, ok := pm.tunnels[id]; !ok { ++ pm.tunnels[id] = &tunnelEntry{} ++ } ++ e := pm.tunnels[id] ++ e.attrInTCP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "in"), attribute.String("protocol", "tcp")) ++ e.attrOutTCP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "out"), attribute.String("protocol", "tcp")) ++ e.attrInUDP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "in"), attribute.String("protocol", "udp")) ++ e.attrOutUDP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "out"), attribute.String("protocol", "udp")) ++} ++ ++// ClearTunnelID clears cached attribute sets for the current tunnel. ++func (pm *ProxyManager) ClearTunnelID() { ++ pm.mutex.Lock() ++ defer pm.mutex.Unlock() ++ id := pm.currentTunnelID ++ if id == "" { return } ++ if e, ok := pm.tunnels[id]; ok { ++ // final flush for this tunnel ++ inTCP := e.bytesInTCP.Swap(0) ++ outTCP := e.bytesOutTCP.Swap(0) ++ inUDP := e.bytesInUDP.Swap(0) ++ outUDP := e.bytesOutUDP.Swap(0) ++ if inTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) } ++ if outTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) } ++ if inUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) } ++ if outUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) } ++ delete(pm.tunnels, id) ++ } ++ pm.currentTunnelID = "" ++} ++ + // init function without tnet + func NewProxyManagerWithoutTNet() *ProxyManager { + return &ProxyManager{ +@@ -160,6 +249,57 @@ func (pm *ProxyManager) Start() error { + return nil + } + ++func (pm *ProxyManager) SetAsyncBytes(b bool) { ++ pm.mutex.Lock() ++ defer pm.mutex.Unlock() ++ pm.asyncBytes = b ++ if b && pm.flushStop == nil { ++ pm.flushStop = make(chan struct{}) ++ go pm.flushLoop() ++ } ++} ++func (pm *ProxyManager) flushLoop() { ++ flushInterval := 2 * time.Second ++ if v := os.Getenv("OTEL_METRIC_EXPORT_INTERVAL"); v != "" { ++ if d, err := time.ParseDuration(v); err == nil && d > 0 { ++ if d/2 < flushInterval { flushInterval = d / 2 } ++ } ++ } ++ ticker := time.NewTicker(flushInterval) ++ defer ticker.Stop() ++ for { ++ select { ++ case <-ticker.C: ++ pm.mutex.RLock() ++ for _, e := range pm.tunnels { ++ inTCP := e.bytesInTCP.Swap(0) ++ outTCP := e.bytesOutTCP.Swap(0) ++ inUDP := e.bytesInUDP.Swap(0) ++ outUDP := e.bytesOutUDP.Swap(0) ++ if inTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) } ++ if outTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) } ++ if inUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) } ++ if outUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) } ++ } ++ pm.mutex.RUnlock() ++ case <-pm.flushStop: ++ pm.mutex.RLock() ++ for _, e := range pm.tunnels { ++ inTCP := e.bytesInTCP.Swap(0) ++ outTCP := e.bytesOutTCP.Swap(0) ++ inUDP := e.bytesInUDP.Swap(0) ++ outUDP := e.bytesOutUDP.Swap(0) ++ if inTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) } ++ if outTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) } ++ if inUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) } ++ if outUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) } ++ } ++ pm.mutex.RUnlock() ++ return ++ } ++ } ++} ++ + func (pm *ProxyManager) Stop() error { + pm.mutex.Lock() + defer pm.mutex.Unlock() +@@ -236,6 +376,14 @@ func (pm *ProxyManager) startTarget(proto, listenIP string, port int, targetAddr + return nil + } + ++// getEntry returns per-tunnel entry or nil. ++func (pm *ProxyManager) getEntry(id string) *tunnelEntry { ++ pm.mutex.RLock() ++ e := pm.tunnels[id] ++ pm.mutex.RUnlock() ++ return e ++} ++ + func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) { + for { + conn, err := listener.Accept() +@@ -257,6 +405,9 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) + continue + } + ++// Count sessions only once per accepted TCP connection ++ if pm.tunnelID != "" { state.Global().IncSessions(pm.tunnelID) } ++ + go func() { + target, err := net.Dial("tcp", targetAddr) + if err != nil { +@@ -265,24 +416,33 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) + return + } + ++ // already incremented on accept ++ + // Create a WaitGroup to ensure both copy operations complete + var wg sync.WaitGroup + wg.Add(2) + ++ // client -> target (direction=in) + go func() { + defer wg.Done() +- io.Copy(target, conn) +- target.Close() ++e := pm.getEntry(pm.currentTunnelID) ++cw := &countingWriter{ctx: context.Background(), w: target, set: e.attrInTCP, pm: pm, ent: e, out: false, proto: "tcp"} ++ _, _ = io.Copy(cw, conn) ++ _ = target.Close() + }() + ++ // target -> client (direction=out) + go func() { + defer wg.Done() +- io.Copy(conn, target) +- conn.Close() ++e := pm.getEntry(pm.currentTunnelID) ++cw := &countingWriter{ctx: context.Background(), w: conn, set: e.attrOutTCP, pm: pm, ent: e, out: true, proto: "tcp"} ++ _, _ = io.Copy(cw, target) ++ _ = conn.Close() + }() + +- // Wait for both copies to complete ++ // Wait for both copies to complete then session -1 + wg.Wait() ++ if pm.tunnelID != "" { state.Global().DecSessions(pm.tunnelID) } + }() + } + } +@@ -326,6 +486,14 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { + } + + clientKey := remoteAddr.String() ++ // bytes from client -> target (direction=in) ++if pm.currentTunnelID != "" && n > 0 { ++if pm.asyncBytes { ++ if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesInUDP.Add(uint64(n)) } ++ } else { ++ if e := pm.getEntry(pm.currentTunnelID); e != nil { telemetry.AddTunnelBytesSet(context.Background(), int64(n), e.attrInUDP) } ++ } ++ } + clientsMutex.RLock() + targetConn, exists := clientConns[clientKey] + clientsMutex.RUnlock() +@@ -366,6 +534,15 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { + return // defer will handle cleanup + } + ++ // bytes from target -> client (direction=out) ++ if pm.currentTunnelID != "" && n > 0 { ++ if pm.asyncBytes { ++ if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesOutUDP.Add(uint64(n)) } ++ } else { ++if e := pm.getEntry(pm.currentTunnelID); e != nil { telemetry.AddTunnelBytesSet(context.Background(), int64(n), e.attrOutUDP) } ++ } ++ } ++ + _, err = conn.WriteTo(buffer[:n], remoteAddr) + if err != nil { + logger.Error("Error writing to client: %v", err) +@@ -375,13 +552,19 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { + }(clientKey, targetConn, remoteAddr) + } + +- _, err = targetConn.Write(buffer[:n]) ++ written, err := targetConn.Write(buffer[:n]) + if err != nil { + logger.Error("Error writing to target: %v", err) + targetConn.Close() + clientsMutex.Lock() + delete(clientConns, clientKey) + clientsMutex.Unlock() ++} else if pm.currentTunnelID != "" && written > 0 { ++ if pm.asyncBytes { ++ if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesInUDP.Add(uint64(written)) } ++ } else { ++if e := pm.getEntry(pm.currentTunnelID); e != nil { telemetry.AddTunnelBytesSet(context.Background(), int64(written), e.attrInUDP) } ++ } + } + } + } diff --git a/patches/02_reconnect_reasons.patch b/patches/02_reconnect_reasons.patch new file mode 100644 index 0000000..c70560f --- /dev/null +++ b/patches/02_reconnect_reasons.patch @@ -0,0 +1,422 @@ +diff --git a/main.go b/main.go +index 12849b1..c223b75 100644 +--- a/main.go ++++ b/main.go +@@ -1,7 +1,9 @@ + package main + + import ( ++ "context" + "encoding/json" ++ "errors" + "flag" + "fmt" + "net" +@@ -22,6 +24,9 @@ import ( + "github.com/fosrl/newt/updates" + "github.com/fosrl/newt/websocket" + ++ "github.com/fosrl/newt/internal/state" ++ "github.com/fosrl/newt/internal/telemetry" ++ "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "golang.zx2c4.com/wireguard/conn" + "golang.zx2c4.com/wireguard/device" + "golang.zx2c4.com/wireguard/tun" +@@ -116,6 +121,13 @@ var ( + healthMonitor *healthcheck.Monitor + enforceHealthcheckCert bool + ++ // Observability/metrics flags ++ metricsEnabled bool ++ otlpEnabled bool ++ adminAddr string ++ region string ++ metricsAsyncBytes bool ++ + // New mTLS configuration variables + tlsClientCert string + tlsClientKey string +@@ -126,6 +138,10 @@ var ( + ) + + func main() { ++ // Prepare context for graceful shutdown and signal handling ++ ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) ++ defer stop() ++ + // if PANGOLIN_ENDPOINT, NEWT_ID, and NEWT_SECRET are set as environment variables, they will be used as default values + endpoint = os.Getenv("PANGOLIN_ENDPOINT") + id = os.Getenv("NEWT_ID") +@@ -141,6 +157,13 @@ func main() { + useNativeInterfaceEnv := os.Getenv("USE_NATIVE_INTERFACE") + enforceHealthcheckCertEnv := os.Getenv("ENFORCE_HC_CERT") + ++ // Metrics/observability env mirrors ++ metricsEnabledEnv := os.Getenv("NEWT_METRICS_PROMETHEUS_ENABLED") ++ otlpEnabledEnv := os.Getenv("NEWT_METRICS_OTLP_ENABLED") ++ adminAddrEnv := os.Getenv("NEWT_ADMIN_ADDR") ++ regionEnv := os.Getenv("NEWT_REGION") ++ asyncBytesEnv := os.Getenv("NEWT_METRICS_ASYNC_BYTES") ++ + keepInterface = keepInterfaceEnv == "true" + acceptClients = acceptClientsEnv == "true" + useNativeInterface = useNativeInterfaceEnv == "true" +@@ -272,6 +295,35 @@ func main() { + flag.StringVar(&healthFile, "health-file", "", "Path to health file (if unset, health file won't be written)") + } + ++ // Metrics/observability flags (mirror ENV if unset) ++ if metricsEnabledEnv == "" { ++ flag.BoolVar(&metricsEnabled, "metrics", true, "Enable Prometheus /metrics exporter") ++ } else { ++ if v, err := strconv.ParseBool(metricsEnabledEnv); err == nil { metricsEnabled = v } else { metricsEnabled = true } ++ } ++ if otlpEnabledEnv == "" { ++ flag.BoolVar(&otlpEnabled, "otlp", false, "Enable OTLP exporters (metrics/traces) to OTEL_EXPORTER_OTLP_ENDPOINT") ++ } else { ++ if v, err := strconv.ParseBool(otlpEnabledEnv); err == nil { otlpEnabled = v } ++ } ++ if adminAddrEnv == "" { ++ flag.StringVar(&adminAddr, "metrics-admin-addr", "127.0.0.1:2112", "Admin/metrics bind address") ++ } else { ++ adminAddr = adminAddrEnv ++ } ++ // Async bytes toggle ++ if asyncBytesEnv == "" { ++ flag.BoolVar(&metricsAsyncBytes, "metrics-async-bytes", false, "Enable async bytes counting (background flush; lower hot path overhead)") ++ } else { ++ if v, err := strconv.ParseBool(asyncBytesEnv); err == nil { metricsAsyncBytes = v } ++ } ++ // Optional region flag (resource attribute) ++ if regionEnv == "" { ++ flag.StringVar(®ion, "region", "", "Optional region resource attribute (also NEWT_REGION)") ++ } else { ++ region = regionEnv ++ } ++ + // do a --version check + version := flag.Bool("version", false, "Print the version") + +@@ -286,6 +338,50 @@ func main() { + loggerLevel := parseLogLevel(logLevel) + logger.GetLogger().SetLevel(parseLogLevel(logLevel)) + ++ // Initialize telemetry after flags are parsed (so flags override env) ++ tcfg := telemetry.FromEnv() ++ tcfg.PromEnabled = metricsEnabled ++ tcfg.OTLPEnabled = otlpEnabled ++ if adminAddr != "" { tcfg.AdminAddr = adminAddr } ++ // Resource attributes (if available) ++ tcfg.SiteID = id ++ tcfg.Region = region ++ // Build info ++ tcfg.BuildVersion = newtVersion ++ tcfg.BuildCommit = os.Getenv("NEWT_COMMIT") ++ ++ tel, telErr := telemetry.Init(ctx, tcfg) ++ if telErr != nil { ++ logger.Warn("Telemetry init failed: %v", telErr) ++ } ++ if tel != nil { ++ // Admin HTTP server (exposes /metrics when Prometheus exporter is enabled) ++ mux := http.NewServeMux() ++ mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) }) ++ if tel.PrometheusHandler != nil { ++ mux.Handle("/metrics", tel.PrometheusHandler) ++ } ++ admin := &http.Server{ ++ Addr: tcfg.AdminAddr, ++ Handler: otelhttp.NewHandler(mux, "newt-admin"), ++ ReadTimeout: 5 * time.Second, ++ WriteTimeout: 10 * time.Second, ++ ReadHeaderTimeout: 5 * time.Second, ++ IdleTimeout: 30 * time.Second, ++ } ++ go func() { ++ if err := admin.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { ++ logger.Warn("admin http error: %v", err) ++ } ++ }() ++ defer func() { ++ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ++ defer cancel() ++ _ = admin.Shutdown(ctx) ++ }() ++ defer func() { _ = tel.Shutdown(context.Background()) }() ++ } ++ + newtVersion := "version_replaceme" + if *version { + fmt.Println("Newt version " + newtVersion) +@@ -557,7 +653,10 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub + } + // Use reliable ping for initial connection test + logger.Debug("Testing initial connection with reliable ping...") +- _, err = reliablePing(tnet, wgData.ServerIP, pingTimeout, 5) ++ lat, err := reliablePing(tnet, wgData.ServerIP, pingTimeout, 5) ++ if err == nil && wgData.PublicKey != "" { ++ telemetry.ObserveTunnelLatency(context.Background(), "", wgData.PublicKey, "wireguard", lat.Seconds()) ++ } + if err != nil { + logger.Warn("Initial reliable ping failed, but continuing: %v", err) + } else { +@@ -570,14 +669,20 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub + // as the pings will continue in the background + if !connected { + logger.Debug("Starting ping check") +- pingStopChan = startPingCheck(tnet, wgData.ServerIP, client) ++ pingStopChan = startPingCheck(tnet, wgData.ServerIP, client, wgData.PublicKey) + } + + // Create proxy manager + pm = proxy.NewProxyManager(tnet) ++ pm.SetAsyncBytes(metricsAsyncBytes) ++ // Set tunnel_id for metrics (WireGuard peer public key) ++ pm.SetTunnelID(wgData.PublicKey) + + connected = true + ++ // telemetry: record a successful site registration (omit region unless available) ++ telemetry.IncSiteRegistration(context.Background(), id, "", "success") ++ + // add the targets if there are any + if len(wgData.Targets.TCP) > 0 { + updateTargets(pm, "add", wgData.TunnelIP, "tcp", TargetData{Targets: wgData.Targets.TCP}) +@@ -611,10 +716,25 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub + + client.RegisterHandler("newt/wg/reconnect", func(msg websocket.WSMessage) { + logger.Info("Received reconnect message") ++ if wgData.PublicKey != "" { ++ telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request") ++ } + + // Close the WireGuard device and TUN + closeWgTunnel() + ++ // Clear metrics attrs and sessions for the tunnel ++ if pm != nil { ++ pm.ClearTunnelID() ++ state.Global().ClearTunnel(wgData.PublicKey) ++ } ++ ++ // Clear metrics attrs and sessions for the tunnel ++ if pm != nil { ++ pm.ClearTunnelID() ++ state.Global().ClearTunnel(wgData.PublicKey) ++ } ++ + // Mark as disconnected + connected = false + +@@ -631,6 +751,9 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub + + client.RegisterHandler("newt/wg/terminate", func(msg websocket.WSMessage) { + logger.Info("Received termination message") ++ if wgData.PublicKey != "" { ++ telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request") ++ } + + // Close the WireGuard device and TUN + closeWgTunnel() +diff --git a/util.go b/util.go +index 7d6da4f..c1f4915 100644 +--- a/util.go ++++ b/util.go +@@ -17,6 +17,7 @@ import ( + "github.com/fosrl/newt/logger" + "github.com/fosrl/newt/proxy" + "github.com/fosrl/newt/websocket" ++ "github.com/fosrl/newt/internal/telemetry" + "golang.org/x/net/icmp" + "golang.org/x/net/ipv4" + "golang.zx2c4.com/wireguard/device" +@@ -229,7 +230,7 @@ func pingWithRetry(tnet *netstack.Net, dst string, timeout time.Duration) (stopC + return stopChan, fmt.Errorf("initial ping attempts failed, continuing in background") + } + +-func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client) chan struct{} { ++func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client, tunnelID string) chan struct{} { + maxInterval := 6 * time.Second + currentInterval := pingInterval + consecutiveFailures := 0 +@@ -292,6 +293,9 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien + if !connectionLost { + connectionLost = true + logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures) ++ if tunnelID != "" { ++ telemetry.IncReconnect(context.Background(), "", tunnelID, telemetry.ReasonTimeout) ++ } + stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second) + // Send registration message to the server for backward compatibility + err := client.SendMessage("newt/wg/register", map[string]interface{}{ +@@ -318,6 +322,10 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien + } else { + // Track recent latencies + recentLatencies = append(recentLatencies, latency) ++ // Record tunnel latency (limit sampling to this periodic check) ++ if tunnelID != "" { ++ telemetry.ObserveTunnelLatency(context.Background(), "", tunnelID, "wireguard", latency.Seconds()) ++ } + if len(recentLatencies) > 10 { + recentLatencies = recentLatencies[1:] + } +diff --git a/websocket/client.go b/websocket/client.go +index 0c0664a..c9ac264 100644 +--- a/websocket/client.go ++++ b/websocket/client.go +@@ -18,6 +18,10 @@ import ( + + "github.com/fosrl/newt/logger" + "github.com/gorilla/websocket" ++ ++ "context" ++ "github.com/fosrl/newt/internal/telemetry" ++ "go.opentelemetry.io/otel" + ) + + type Client struct { +@@ -287,6 +291,7 @@ func (c *Client) getToken() (string, error) { + } + resp, err := client.Do(req) + if err != nil { ++ telemetry.IncConnError(context.Background(), c.config.ID, "auth", classifyConnError(err)) + return "", fmt.Errorf("failed to request new token: %w", err) + } + defer resp.Body.Close() +@@ -294,6 +299,18 @@ func (c *Client) getToken() (string, error) { + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "failure") ++ bin := "http_other" ++ if resp.StatusCode >= 500 { ++ bin = "http_5xx" ++ } else if resp.StatusCode >= 400 { ++ bin = "http_4xx" ++ } ++ telemetry.IncConnError(context.Background(), c.config.ID, "auth", bin) ++ // Reconnect reason mapping for auth failures ++ if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { ++ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonAuthError) ++ } + return "", fmt.Errorf("failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) + } + +@@ -312,10 +329,33 @@ func (c *Client) getToken() (string, error) { + } + + logger.Debug("Received token: %s", tokenResp.Data.Token) ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "success") + + return tokenResp.Data.Token, nil + } + ++// classifyConnError maps common errors to low-cardinality error_type labels ++func classifyConnError(err error) string { ++ if err == nil { ++ return "" ++ } ++ msg := strings.ToLower(err.Error()) ++ switch { ++ case strings.Contains(msg, "tls") || strings.Contains(msg, "certificate"): ++ return "tls" ++ case strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout"): ++ return "timeout" ++ case strings.Contains(msg, "no such host") || strings.Contains(msg, "dns"): ++ return "dns" ++ case strings.Contains(msg, "unauthorized") || strings.Contains(msg, "forbidden"): ++ return "auth" ++ case strings.Contains(msg, "broken pipe") || strings.Contains(msg, "connection reset") || strings.Contains(msg, "connection refused") || strings.Contains(msg, "use of closed network connection") || strings.Contains(msg, "network is unreachable"): ++ return "io" ++ default: ++ return "other" ++ } ++} ++ + func (c *Client) connectWithRetry() { + for { + select { +@@ -337,6 +377,10 @@ func (c *Client) establishConnection() error { + // Get token for authentication + token, err := c.getToken() + if err != nil { ++ // telemetry: connection attempt failed before dialing ++ // site_id isn't globally available here; use client ID as site_id (low cardinality) ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure") ++ telemetry.IncConnError(context.Background(), c.config.ID, "websocket", classifyConnError(err)) + return fmt.Errorf("failed to get token: %w", err) + } + +@@ -369,7 +413,11 @@ func (c *Client) establishConnection() error { + q.Set("clientType", c.clientType) + u.RawQuery = q.Encode() + +- // Connect to WebSocket ++ // Connect to WebSocket (optional span) ++ tr := otel.Tracer("newt") ++ spanCtx, span := tr.Start(context.Background(), "ws.connect") ++ defer span.End() ++ + dialer := websocket.DefaultDialer + + // Use new TLS configuration method +@@ -391,11 +439,23 @@ func (c *Client) establishConnection() error { + logger.Debug("WebSocket TLS certificate verification disabled via SKIP_TLS_VERIFY environment variable") + } + +- conn, _, err := dialer.Dial(u.String(), nil) ++conn, _, err := dialer.DialContext(spanCtx, u.String(), nil) + if err != nil { ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure") ++ etype := classifyConnError(err) ++ telemetry.IncConnError(context.Background(), c.config.ID, "websocket", etype) ++ // Map handshake-related errors to reconnect reasons where appropriate ++ if etype == "tls" { ++ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonHandshakeError) ++ } else if etype == "timeout" { ++ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonTimeout) ++ } else { ++ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonError) ++ } + return fmt.Errorf("failed to connect to WebSocket: %w", err) + } + ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "success") + c.conn = conn + c.setConnected(true) + +diff --git a/wg/wg.go b/wg/wg.go +index 3cee1a9..a765279 100644 +--- a/wg/wg.go ++++ b/wg/wg.go +@@ -3,6 +3,7 @@ + package wg + + import ( ++ "context" + "encoding/json" + "errors" + "fmt" +@@ -23,6 +24,8 @@ import ( + "golang.zx2c4.com/wireguard/conn" + "golang.zx2c4.com/wireguard/wgctrl" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ++ ++ "github.com/fosrl/newt/internal/telemetry" + ) + + type WgConfig struct { +@@ -298,6 +301,13 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { + s.stopGetConfig = nil + } + ++ // telemetry: config reload success ++ telemetry.IncConfigReload(context.Background(), "success") ++ // Optional reconnect reason mapping: config change ++ if s.serverPubKey != "" { ++ telemetry.IncReconnect(context.Background(), "", s.serverPubKey, telemetry.ReasonConfigChange) ++ } ++ + // Ensure the WireGuard interface and peers are configured + if err := s.ensureWireguardInterface(config); err != nil { + logger.Error("Failed to ensure WireGuard interface: %v", err) diff --git a/patches/02_reconnect_rtt.patch b/patches/02_reconnect_rtt.patch new file mode 100644 index 0000000..04a88f9 --- /dev/null +++ b/patches/02_reconnect_rtt.patch @@ -0,0 +1,466 @@ +diff --git a/main.go b/main.go +index 12849b1..c223b75 100644 +--- a/main.go ++++ b/main.go +@@ -1,7 +1,9 @@ + package main + + import ( ++ "context" + "encoding/json" ++ "errors" + "flag" + "fmt" + "net" +@@ -22,6 +24,9 @@ import ( + "github.com/fosrl/newt/updates" + "github.com/fosrl/newt/websocket" + ++ "github.com/fosrl/newt/internal/state" ++ "github.com/fosrl/newt/internal/telemetry" ++ "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "golang.zx2c4.com/wireguard/conn" + "golang.zx2c4.com/wireguard/device" + "golang.zx2c4.com/wireguard/tun" +@@ -116,6 +121,13 @@ var ( + healthMonitor *healthcheck.Monitor + enforceHealthcheckCert bool + ++ // Observability/metrics flags ++ metricsEnabled bool ++ otlpEnabled bool ++ adminAddr string ++ region string ++ metricsAsyncBytes bool ++ + // New mTLS configuration variables + tlsClientCert string + tlsClientKey string +@@ -126,6 +138,10 @@ var ( + ) + + func main() { ++ // Prepare context for graceful shutdown and signal handling ++ ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) ++ defer stop() ++ + // if PANGOLIN_ENDPOINT, NEWT_ID, and NEWT_SECRET are set as environment variables, they will be used as default values + endpoint = os.Getenv("PANGOLIN_ENDPOINT") + id = os.Getenv("NEWT_ID") +@@ -141,6 +157,13 @@ func main() { + useNativeInterfaceEnv := os.Getenv("USE_NATIVE_INTERFACE") + enforceHealthcheckCertEnv := os.Getenv("ENFORCE_HC_CERT") + ++ // Metrics/observability env mirrors ++ metricsEnabledEnv := os.Getenv("NEWT_METRICS_PROMETHEUS_ENABLED") ++ otlpEnabledEnv := os.Getenv("NEWT_METRICS_OTLP_ENABLED") ++ adminAddrEnv := os.Getenv("NEWT_ADMIN_ADDR") ++ regionEnv := os.Getenv("NEWT_REGION") ++ asyncBytesEnv := os.Getenv("NEWT_METRICS_ASYNC_BYTES") ++ + keepInterface = keepInterfaceEnv == "true" + acceptClients = acceptClientsEnv == "true" + useNativeInterface = useNativeInterfaceEnv == "true" +@@ -272,6 +295,35 @@ func main() { + flag.StringVar(&healthFile, "health-file", "", "Path to health file (if unset, health file won't be written)") + } + ++ // Metrics/observability flags (mirror ENV if unset) ++ if metricsEnabledEnv == "" { ++ flag.BoolVar(&metricsEnabled, "metrics", true, "Enable Prometheus /metrics exporter") ++ } else { ++ if v, err := strconv.ParseBool(metricsEnabledEnv); err == nil { metricsEnabled = v } else { metricsEnabled = true } ++ } ++ if otlpEnabledEnv == "" { ++ flag.BoolVar(&otlpEnabled, "otlp", false, "Enable OTLP exporters (metrics/traces) to OTEL_EXPORTER_OTLP_ENDPOINT") ++ } else { ++ if v, err := strconv.ParseBool(otlpEnabledEnv); err == nil { otlpEnabled = v } ++ } ++ if adminAddrEnv == "" { ++ flag.StringVar(&adminAddr, "metrics-admin-addr", "127.0.0.1:2112", "Admin/metrics bind address") ++ } else { ++ adminAddr = adminAddrEnv ++ } ++ // Async bytes toggle ++ if asyncBytesEnv == "" { ++ flag.BoolVar(&metricsAsyncBytes, "metrics-async-bytes", false, "Enable async bytes counting (background flush; lower hot path overhead)") ++ } else { ++ if v, err := strconv.ParseBool(asyncBytesEnv); err == nil { metricsAsyncBytes = v } ++ } ++ // Optional region flag (resource attribute) ++ if regionEnv == "" { ++ flag.StringVar(®ion, "region", "", "Optional region resource attribute (also NEWT_REGION)") ++ } else { ++ region = regionEnv ++ } ++ + // do a --version check + version := flag.Bool("version", false, "Print the version") + +@@ -286,6 +338,50 @@ func main() { + loggerLevel := parseLogLevel(logLevel) + logger.GetLogger().SetLevel(parseLogLevel(logLevel)) + ++ // Initialize telemetry after flags are parsed (so flags override env) ++ tcfg := telemetry.FromEnv() ++ tcfg.PromEnabled = metricsEnabled ++ tcfg.OTLPEnabled = otlpEnabled ++ if adminAddr != "" { tcfg.AdminAddr = adminAddr } ++ // Resource attributes (if available) ++ tcfg.SiteID = id ++ tcfg.Region = region ++ // Build info ++ tcfg.BuildVersion = newtVersion ++ tcfg.BuildCommit = os.Getenv("NEWT_COMMIT") ++ ++ tel, telErr := telemetry.Init(ctx, tcfg) ++ if telErr != nil { ++ logger.Warn("Telemetry init failed: %v", telErr) ++ } ++ if tel != nil { ++ // Admin HTTP server (exposes /metrics when Prometheus exporter is enabled) ++ mux := http.NewServeMux() ++ mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) }) ++ if tel.PrometheusHandler != nil { ++ mux.Handle("/metrics", tel.PrometheusHandler) ++ } ++ admin := &http.Server{ ++ Addr: tcfg.AdminAddr, ++ Handler: otelhttp.NewHandler(mux, "newt-admin"), ++ ReadTimeout: 5 * time.Second, ++ WriteTimeout: 10 * time.Second, ++ ReadHeaderTimeout: 5 * time.Second, ++ IdleTimeout: 30 * time.Second, ++ } ++ go func() { ++ if err := admin.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { ++ logger.Warn("admin http error: %v", err) ++ } ++ }() ++ defer func() { ++ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ++ defer cancel() ++ _ = admin.Shutdown(ctx) ++ }() ++ defer func() { _ = tel.Shutdown(context.Background()) }() ++ } ++ + newtVersion := "version_replaceme" + if *version { + fmt.Println("Newt version " + newtVersion) +@@ -557,7 +653,10 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub + } + // Use reliable ping for initial connection test + logger.Debug("Testing initial connection with reliable ping...") +- _, err = reliablePing(tnet, wgData.ServerIP, pingTimeout, 5) ++ lat, err := reliablePing(tnet, wgData.ServerIP, pingTimeout, 5) ++ if err == nil && wgData.PublicKey != "" { ++ telemetry.ObserveTunnelLatency(context.Background(), "", wgData.PublicKey, "wireguard", lat.Seconds()) ++ } + if err != nil { + logger.Warn("Initial reliable ping failed, but continuing: %v", err) + } else { +@@ -570,14 +669,20 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub + // as the pings will continue in the background + if !connected { + logger.Debug("Starting ping check") +- pingStopChan = startPingCheck(tnet, wgData.ServerIP, client) ++ pingStopChan = startPingCheck(tnet, wgData.ServerIP, client, wgData.PublicKey) + } + + // Create proxy manager + pm = proxy.NewProxyManager(tnet) ++ pm.SetAsyncBytes(metricsAsyncBytes) ++ // Set tunnel_id for metrics (WireGuard peer public key) ++ pm.SetTunnelID(wgData.PublicKey) + + connected = true + ++ // telemetry: record a successful site registration (omit region unless available) ++ telemetry.IncSiteRegistration(context.Background(), id, "", "success") ++ + // add the targets if there are any + if len(wgData.Targets.TCP) > 0 { + updateTargets(pm, "add", wgData.TunnelIP, "tcp", TargetData{Targets: wgData.Targets.TCP}) +@@ -611,10 +716,25 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub + + client.RegisterHandler("newt/wg/reconnect", func(msg websocket.WSMessage) { + logger.Info("Received reconnect message") ++ if wgData.PublicKey != "" { ++ telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request") ++ } + + // Close the WireGuard device and TUN + closeWgTunnel() + ++ // Clear metrics attrs and sessions for the tunnel ++ if pm != nil { ++ pm.ClearTunnelID() ++ state.Global().ClearTunnel(wgData.PublicKey) ++ } ++ ++ // Clear metrics attrs and sessions for the tunnel ++ if pm != nil { ++ pm.ClearTunnelID() ++ state.Global().ClearTunnel(wgData.PublicKey) ++ } ++ + // Mark as disconnected + connected = false + +@@ -631,6 +751,9 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub + + client.RegisterHandler("newt/wg/terminate", func(msg websocket.WSMessage) { + logger.Info("Received termination message") ++ if wgData.PublicKey != "" { ++ telemetry.IncReconnect(context.Background(), "", wgData.PublicKey, "server_request") ++ } + + // Close the WireGuard device and TUN + closeWgTunnel() +diff --git a/util.go b/util.go +index 7d6da4f..c1f4915 100644 +--- a/util.go ++++ b/util.go +@@ -17,6 +17,7 @@ import ( + "github.com/fosrl/newt/logger" + "github.com/fosrl/newt/proxy" + "github.com/fosrl/newt/websocket" ++ "github.com/fosrl/newt/internal/telemetry" + "golang.org/x/net/icmp" + "golang.org/x/net/ipv4" + "golang.zx2c4.com/wireguard/device" +@@ -229,7 +230,7 @@ func pingWithRetry(tnet *netstack.Net, dst string, timeout time.Duration) (stopC + return stopChan, fmt.Errorf("initial ping attempts failed, continuing in background") + } + +-func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client) chan struct{} { ++func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Client, tunnelID string) chan struct{} { + maxInterval := 6 * time.Second + currentInterval := pingInterval + consecutiveFailures := 0 +@@ -292,6 +293,9 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien + if !connectionLost { + connectionLost = true + logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures) ++ if tunnelID != "" { ++ telemetry.IncReconnect(context.Background(), "", tunnelID, telemetry.ReasonTimeout) ++ } + stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second) + // Send registration message to the server for backward compatibility + err := client.SendMessage("newt/wg/register", map[string]interface{}{ +@@ -318,6 +322,10 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien + } else { + // Track recent latencies + recentLatencies = append(recentLatencies, latency) ++ // Record tunnel latency (limit sampling to this periodic check) ++ if tunnelID != "" { ++ telemetry.ObserveTunnelLatency(context.Background(), "", tunnelID, "wireguard", latency.Seconds()) ++ } + if len(recentLatencies) > 10 { + recentLatencies = recentLatencies[1:] + } +diff --git a/websocket/client.go b/websocket/client.go +index 0c0664a..c9ac264 100644 +--- a/websocket/client.go ++++ b/websocket/client.go +@@ -18,6 +18,10 @@ import ( + + "github.com/fosrl/newt/logger" + "github.com/gorilla/websocket" ++ ++ "context" ++ "github.com/fosrl/newt/internal/telemetry" ++ "go.opentelemetry.io/otel" + ) + + type Client struct { +@@ -287,6 +291,7 @@ func (c *Client) getToken() (string, error) { + } + resp, err := client.Do(req) + if err != nil { ++ telemetry.IncConnError(context.Background(), c.config.ID, "auth", classifyConnError(err)) + return "", fmt.Errorf("failed to request new token: %w", err) + } + defer resp.Body.Close() +@@ -294,6 +299,18 @@ func (c *Client) getToken() (string, error) { + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "failure") ++ bin := "http_other" ++ if resp.StatusCode >= 500 { ++ bin = "http_5xx" ++ } else if resp.StatusCode >= 400 { ++ bin = "http_4xx" ++ } ++ telemetry.IncConnError(context.Background(), c.config.ID, "auth", bin) ++ // Reconnect reason mapping for auth failures ++ if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { ++ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonAuthError) ++ } + return "", fmt.Errorf("failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) + } + +@@ -312,10 +329,33 @@ func (c *Client) getToken() (string, error) { + } + + logger.Debug("Received token: %s", tokenResp.Data.Token) ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "success") + + return tokenResp.Data.Token, nil + } + ++// classifyConnError maps common errors to low-cardinality error_type labels ++func classifyConnError(err error) string { ++ if err == nil { ++ return "" ++ } ++ msg := strings.ToLower(err.Error()) ++ switch { ++ case strings.Contains(msg, "tls") || strings.Contains(msg, "certificate"): ++ return "tls" ++ case strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout"): ++ return "timeout" ++ case strings.Contains(msg, "no such host") || strings.Contains(msg, "dns"): ++ return "dns" ++ case strings.Contains(msg, "unauthorized") || strings.Contains(msg, "forbidden"): ++ return "auth" ++ case strings.Contains(msg, "broken pipe") || strings.Contains(msg, "connection reset") || strings.Contains(msg, "connection refused") || strings.Contains(msg, "use of closed network connection") || strings.Contains(msg, "network is unreachable"): ++ return "io" ++ default: ++ return "other" ++ } ++} ++ + func (c *Client) connectWithRetry() { + for { + select { +@@ -337,6 +377,10 @@ func (c *Client) establishConnection() error { + // Get token for authentication + token, err := c.getToken() + if err != nil { ++ // telemetry: connection attempt failed before dialing ++ // site_id isn't globally available here; use client ID as site_id (low cardinality) ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure") ++ telemetry.IncConnError(context.Background(), c.config.ID, "websocket", classifyConnError(err)) + return fmt.Errorf("failed to get token: %w", err) + } + +@@ -369,7 +413,11 @@ func (c *Client) establishConnection() error { + q.Set("clientType", c.clientType) + u.RawQuery = q.Encode() + +- // Connect to WebSocket ++ // Connect to WebSocket (optional span) ++ tr := otel.Tracer("newt") ++ spanCtx, span := tr.Start(context.Background(), "ws.connect") ++ defer span.End() ++ + dialer := websocket.DefaultDialer + + // Use new TLS configuration method +@@ -391,11 +439,23 @@ func (c *Client) establishConnection() error { + logger.Debug("WebSocket TLS certificate verification disabled via SKIP_TLS_VERIFY environment variable") + } + +- conn, _, err := dialer.Dial(u.String(), nil) ++conn, _, err := dialer.DialContext(spanCtx, u.String(), nil) + if err != nil { ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure") ++ etype := classifyConnError(err) ++ telemetry.IncConnError(context.Background(), c.config.ID, "websocket", etype) ++ // Map handshake-related errors to reconnect reasons where appropriate ++ if etype == "tls" { ++ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonHandshakeError) ++ } else if etype == "timeout" { ++ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonTimeout) ++ } else { ++ telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonError) ++ } + return fmt.Errorf("failed to connect to WebSocket: %w", err) + } + ++ telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "success") + c.conn = conn + c.setConnected(true) + +diff --git a/wg/wg.go b/wg/wg.go +index 3cee1a9..a765279 100644 +--- a/wg/wg.go ++++ b/wg/wg.go +@@ -3,6 +3,7 @@ + package wg + + import ( ++ "context" + "encoding/json" + "errors" + "fmt" +@@ -23,6 +24,8 @@ import ( + "golang.zx2c4.com/wireguard/conn" + "golang.zx2c4.com/wireguard/wgctrl" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ++ ++ "github.com/fosrl/newt/internal/telemetry" + ) + + type WgConfig struct { +@@ -298,6 +301,13 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { + s.stopGetConfig = nil + } + ++ // telemetry: config reload success ++ telemetry.IncConfigReload(context.Background(), "success") ++ // Optional reconnect reason mapping: config change ++ if s.serverPubKey != "" { ++ telemetry.IncReconnect(context.Background(), "", s.serverPubKey, telemetry.ReasonConfigChange) ++ } ++ + // Ensure the WireGuard interface and peers are configured + if err := s.ensureWireguardInterface(config); err != nil { + logger.Error("Failed to ensure WireGuard interface: %v", err) +diff --git a/wgnetstack/wgnetstack.go b/wgnetstack/wgnetstack.go +index 6684c40..09f160e 100644 +--- a/wgnetstack/wgnetstack.go ++++ b/wgnetstack/wgnetstack.go +@@ -1,6 +1,7 @@ + package wgnetstack + + import ( ++ "context" + "crypto/rand" + "encoding/base64" + "encoding/hex" +@@ -26,6 +27,8 @@ import ( + "golang.zx2c4.com/wireguard/tun" + "golang.zx2c4.com/wireguard/tun/netstack" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ++ ++ "github.com/fosrl/newt/internal/telemetry" + ) + + type WgConfig struct { +@@ -240,14 +243,20 @@ func NewWireGuardService(interfaceName string, mtu int, generateAndSaveKeyTo str + return service, nil + } + ++// ReportRTT allows reporting native RTTs to telemetry, rate-limited externally. ++func (s *WireGuardService) ReportRTT(seconds float64) { ++ if s.serverPubKey == "" { return } ++ telemetry.ObserveTunnelLatency(context.Background(), "", s.serverPubKey, "wireguard", seconds) ++} ++ + func (s *WireGuardService) addTcpTarget(msg websocket.WSMessage) { + logger.Debug("Received: %+v", msg) + + // if there is no wgData or pm, we can't add targets + if s.TunnelIP == "" || s.proxyManager == nil { + logger.Info("No tunnel IP or proxy manager available") +- return +- } ++ return ++} + + targetData, err := parseTargetData(msg.Data) + if err != nil { diff --git a/patches/03_constants_docs.patch b/patches/03_constants_docs.patch new file mode 100644 index 0000000..e69de29 diff --git a/patches/03_wg_rtt_hook.patch b/patches/03_wg_rtt_hook.patch new file mode 100644 index 0000000..4d5eb19 --- /dev/null +++ b/patches/03_wg_rtt_hook.patch @@ -0,0 +1,44 @@ +diff --git a/wgnetstack/wgnetstack.go b/wgnetstack/wgnetstack.go +index 6684c40..09f160e 100644 +--- a/wgnetstack/wgnetstack.go ++++ b/wgnetstack/wgnetstack.go +@@ -1,6 +1,7 @@ + package wgnetstack + + import ( ++ "context" + "crypto/rand" + "encoding/base64" + "encoding/hex" +@@ -26,6 +27,8 @@ import ( + "golang.zx2c4.com/wireguard/tun" + "golang.zx2c4.com/wireguard/tun/netstack" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ++ ++ "github.com/fosrl/newt/internal/telemetry" + ) + + type WgConfig struct { +@@ -240,14 +243,20 @@ func NewWireGuardService(interfaceName string, mtu int, generateAndSaveKeyTo str + return service, nil + } + ++// ReportRTT allows reporting native RTTs to telemetry, rate-limited externally. ++func (s *WireGuardService) ReportRTT(seconds float64) { ++ if s.serverPubKey == "" { return } ++ telemetry.ObserveTunnelLatency(context.Background(), "", s.serverPubKey, "wireguard", seconds) ++} ++ + func (s *WireGuardService) addTcpTarget(msg websocket.WSMessage) { + logger.Debug("Received: %+v", msg) + + // if there is no wgData or pm, we can't add targets + if s.TunnelIP == "" || s.proxyManager == nil { + logger.Info("No tunnel IP or proxy manager available") +- return +- } ++ return ++} + + targetData, err := parseTargetData(msg.Data) + if err != nil { diff --git a/patches/04_tests_docs.patch b/patches/04_tests_docs.patch new file mode 100644 index 0000000..e69de29 diff --git a/patches/HOWTO-APPLY.md b/patches/HOWTO-APPLY.md new file mode 100644 index 0000000..aaf0e53 --- /dev/null +++ b/patches/HOWTO-APPLY.md @@ -0,0 +1,25 @@ +# How to apply patches + +These patches were generated from the working tree without commits. You can apply them in one shot or in topic order. + +One shot (recommended during review): + +```bash +git apply patches/00_all_changes.patch +``` + +Topic order: + +```bash +git apply patches/01_proxy_multitunnel.patch +git apply patches/02_reconnect_rtt.patch +git apply patches/03_constants_docs.patch +``` + +Rollback (restore to HEAD and clean untracked files): + +```bash +git restore --source=HEAD --worktree --staged . +git clean -fd +``` +