Files
newt/patches/01_proxy_multitunnel.patch
Marc Schäfer b53fb70778 feat: Implement telemetry for reconnect reasons and RTT reporting
- Added telemetry hooks to track reconnect reasons for WireGuard connections, including server requests and authentication errors.
- Introduced RTT reporting to telemetry for better latency monitoring.
- Enhanced metrics configuration with flags for Prometheus and OTLP exporters.
- Implemented graceful shutdown and signal handling in the main application.
- Updated WebSocket client to classify connection errors and report them to telemetry.
- Added support for async byte counting in metrics.
- Improved handling of reconnect scenarios in the WireGuard service.
- Added documentation for applying patches and rollback procedures.
2025-10-07 09:17:05 +02:00

302 lines
9.9 KiB
Diff

diff --git a/proxy/manager.go b/proxy/manager.go
index bf10322..86c47a8 100644
--- a/proxy/manager.go
+++ b/proxy/manager.go
@@ -1,16 +1,22 @@
package proxy
import (
+ "context"
"fmt"
"io"
"net"
+ "os"
"strings"
"sync"
+ "sync/atomic"
"time"
+ "github.com/fosrl/newt/internal/state"
+ "github.com/fosrl/newt/internal/telemetry"
"github.com/fosrl/newt/logger"
"golang.zx2c4.com/wireguard/tun/netstack"
"gvisor.dev/gvisor/pkg/tcpip/adapters/gonet"
+ "go.opentelemetry.io/otel/attribute"
)
// Target represents a proxy target with its address and port
@@ -28,6 +34,52 @@ type ProxyManager struct {
udpConns []*gonet.UDPConn
running bool
mutex sync.RWMutex
+
+ // telemetry (multi-tunnel)
+ currentTunnelID string
+ tunnels map[string]*tunnelEntry
+ asyncBytes bool
+ flushStop chan struct{}
+}
+
+// tunnelEntry holds per-tunnel attributes and (optional) async counters.
+type tunnelEntry struct {
+ attrInTCP attribute.Set
+ attrOutTCP attribute.Set
+ attrInUDP attribute.Set
+ attrOutUDP attribute.Set
+
+ bytesInTCP atomic.Uint64
+ bytesOutTCP atomic.Uint64
+ bytesInUDP atomic.Uint64
+ bytesOutUDP atomic.Uint64
+}
+
+// countingWriter wraps an io.Writer and adds bytes to OTel counter using a pre-built attribute set.
+type countingWriter struct {
+ ctx context.Context
+ w io.Writer
+ set attribute.Set
+ pm *ProxyManager
+ ent *tunnelEntry
+ out bool // false=in, true=out
+ proto string // "tcp" or "udp"
+}
+
+func (cw *countingWriter) Write(p []byte) (int, error) {
+ n, err := cw.w.Write(p)
+ if n > 0 {
+ if cw.pm != nil && cw.pm.asyncBytes && cw.ent != nil {
+ if cw.proto == "tcp" {
+ if cw.out { cw.ent.bytesOutTCP.Add(uint64(n)) } else { cw.ent.bytesInTCP.Add(uint64(n)) }
+ } else if cw.proto == "udp" {
+ if cw.out { cw.ent.bytesOutUDP.Add(uint64(n)) } else { cw.ent.bytesInUDP.Add(uint64(n)) }
+ }
+ } else {
+ telemetry.AddTunnelBytesSet(cw.ctx, int64(n), cw.set)
+ }
+ }
+ return n, err
}
// NewProxyManager creates a new proxy manager instance
@@ -38,9 +90,46 @@ func NewProxyManager(tnet *netstack.Net) *ProxyManager {
udpTargets: make(map[string]map[int]string),
listeners: make([]*gonet.TCPListener, 0),
udpConns: make([]*gonet.UDPConn, 0),
+ tunnels: make(map[string]*tunnelEntry),
}
}
+// SetTunnelID sets the WireGuard peer public key used as tunnel_id label.
+func (pm *ProxyManager) SetTunnelID(id string) {
+ pm.mutex.Lock()
+ defer pm.mutex.Unlock()
+ pm.currentTunnelID = id
+ if _, ok := pm.tunnels[id]; !ok {
+ pm.tunnels[id] = &tunnelEntry{}
+ }
+ e := pm.tunnels[id]
+ e.attrInTCP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "in"), attribute.String("protocol", "tcp"))
+ e.attrOutTCP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "out"), attribute.String("protocol", "tcp"))
+ e.attrInUDP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "in"), attribute.String("protocol", "udp"))
+ e.attrOutUDP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "out"), attribute.String("protocol", "udp"))
+}
+
+// ClearTunnelID clears cached attribute sets for the current tunnel.
+func (pm *ProxyManager) ClearTunnelID() {
+ pm.mutex.Lock()
+ defer pm.mutex.Unlock()
+ id := pm.currentTunnelID
+ if id == "" { return }
+ if e, ok := pm.tunnels[id]; ok {
+ // final flush for this tunnel
+ inTCP := e.bytesInTCP.Swap(0)
+ outTCP := e.bytesOutTCP.Swap(0)
+ inUDP := e.bytesInUDP.Swap(0)
+ outUDP := e.bytesOutUDP.Swap(0)
+ if inTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) }
+ if outTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) }
+ if inUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) }
+ if outUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) }
+ delete(pm.tunnels, id)
+ }
+ pm.currentTunnelID = ""
+}
+
// init function without tnet
func NewProxyManagerWithoutTNet() *ProxyManager {
return &ProxyManager{
@@ -160,6 +249,57 @@ func (pm *ProxyManager) Start() error {
return nil
}
+func (pm *ProxyManager) SetAsyncBytes(b bool) {
+ pm.mutex.Lock()
+ defer pm.mutex.Unlock()
+ pm.asyncBytes = b
+ if b && pm.flushStop == nil {
+ pm.flushStop = make(chan struct{})
+ go pm.flushLoop()
+ }
+}
+func (pm *ProxyManager) flushLoop() {
+ flushInterval := 2 * time.Second
+ if v := os.Getenv("OTEL_METRIC_EXPORT_INTERVAL"); v != "" {
+ if d, err := time.ParseDuration(v); err == nil && d > 0 {
+ if d/2 < flushInterval { flushInterval = d / 2 }
+ }
+ }
+ ticker := time.NewTicker(flushInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ pm.mutex.RLock()
+ for _, e := range pm.tunnels {
+ inTCP := e.bytesInTCP.Swap(0)
+ outTCP := e.bytesOutTCP.Swap(0)
+ inUDP := e.bytesInUDP.Swap(0)
+ outUDP := e.bytesOutUDP.Swap(0)
+ if inTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) }
+ if outTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) }
+ if inUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) }
+ if outUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) }
+ }
+ pm.mutex.RUnlock()
+ case <-pm.flushStop:
+ pm.mutex.RLock()
+ for _, e := range pm.tunnels {
+ inTCP := e.bytesInTCP.Swap(0)
+ outTCP := e.bytesOutTCP.Swap(0)
+ inUDP := e.bytesInUDP.Swap(0)
+ outUDP := e.bytesOutUDP.Swap(0)
+ if inTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) }
+ if outTCP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) }
+ if inUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) }
+ if outUDP > 0 { telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) }
+ }
+ pm.mutex.RUnlock()
+ return
+ }
+ }
+}
+
func (pm *ProxyManager) Stop() error {
pm.mutex.Lock()
defer pm.mutex.Unlock()
@@ -236,6 +376,14 @@ func (pm *ProxyManager) startTarget(proto, listenIP string, port int, targetAddr
return nil
}
+// getEntry returns per-tunnel entry or nil.
+func (pm *ProxyManager) getEntry(id string) *tunnelEntry {
+ pm.mutex.RLock()
+ e := pm.tunnels[id]
+ pm.mutex.RUnlock()
+ return e
+}
+
func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) {
for {
conn, err := listener.Accept()
@@ -257,6 +405,9 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
continue
}
+// Count sessions only once per accepted TCP connection
+ if pm.tunnelID != "" { state.Global().IncSessions(pm.tunnelID) }
+
go func() {
target, err := net.Dial("tcp", targetAddr)
if err != nil {
@@ -265,24 +416,33 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
return
}
+ // already incremented on accept
+
// Create a WaitGroup to ensure both copy operations complete
var wg sync.WaitGroup
wg.Add(2)
+ // client -> target (direction=in)
go func() {
defer wg.Done()
- io.Copy(target, conn)
- target.Close()
+e := pm.getEntry(pm.currentTunnelID)
+cw := &countingWriter{ctx: context.Background(), w: target, set: e.attrInTCP, pm: pm, ent: e, out: false, proto: "tcp"}
+ _, _ = io.Copy(cw, conn)
+ _ = target.Close()
}()
+ // target -> client (direction=out)
go func() {
defer wg.Done()
- io.Copy(conn, target)
- conn.Close()
+e := pm.getEntry(pm.currentTunnelID)
+cw := &countingWriter{ctx: context.Background(), w: conn, set: e.attrOutTCP, pm: pm, ent: e, out: true, proto: "tcp"}
+ _, _ = io.Copy(cw, target)
+ _ = conn.Close()
}()
- // Wait for both copies to complete
+ // Wait for both copies to complete then session -1
wg.Wait()
+ if pm.tunnelID != "" { state.Global().DecSessions(pm.tunnelID) }
}()
}
}
@@ -326,6 +486,14 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
}
clientKey := remoteAddr.String()
+ // bytes from client -> target (direction=in)
+if pm.currentTunnelID != "" && n > 0 {
+if pm.asyncBytes {
+ if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesInUDP.Add(uint64(n)) }
+ } else {
+ if e := pm.getEntry(pm.currentTunnelID); e != nil { telemetry.AddTunnelBytesSet(context.Background(), int64(n), e.attrInUDP) }
+ }
+ }
clientsMutex.RLock()
targetConn, exists := clientConns[clientKey]
clientsMutex.RUnlock()
@@ -366,6 +534,15 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
return // defer will handle cleanup
}
+ // bytes from target -> client (direction=out)
+ if pm.currentTunnelID != "" && n > 0 {
+ if pm.asyncBytes {
+ if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesOutUDP.Add(uint64(n)) }
+ } else {
+if e := pm.getEntry(pm.currentTunnelID); e != nil { telemetry.AddTunnelBytesSet(context.Background(), int64(n), e.attrOutUDP) }
+ }
+ }
+
_, err = conn.WriteTo(buffer[:n], remoteAddr)
if err != nil {
logger.Error("Error writing to client: %v", err)
@@ -375,13 +552,19 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
}(clientKey, targetConn, remoteAddr)
}
- _, err = targetConn.Write(buffer[:n])
+ written, err := targetConn.Write(buffer[:n])
if err != nil {
logger.Error("Error writing to target: %v", err)
targetConn.Close()
clientsMutex.Lock()
delete(clientConns, clientKey)
clientsMutex.Unlock()
+} else if pm.currentTunnelID != "" && written > 0 {
+ if pm.asyncBytes {
+ if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesInUDP.Add(uint64(written)) }
+ } else {
+if e := pm.getEntry(pm.currentTunnelID); e != nil { telemetry.AddTunnelBytesSet(context.Background(), int64(written), e.attrInUDP) }
+ }
}
}
}