From ceef228665e88233b159b9273adf75d454ea4723 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc=20Sch=C3=A4fer?= Date: Tue, 7 Oct 2025 09:13:03 +0200 Subject: [PATCH] Refactor ProxyManager for per-tunnel metrics, async bytes collection, and session counting --- proxy/manager.go | 247 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 241 insertions(+), 6 deletions(-) diff --git a/proxy/manager.go b/proxy/manager.go index bf10322..e2b7a79 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -1,14 +1,20 @@ package proxy import ( + "context" "fmt" "io" "net" + "os" "strings" "sync" + "sync/atomic" "time" + "github.com/fosrl/newt/internal/state" + "github.com/fosrl/newt/internal/telemetry" "github.com/fosrl/newt/logger" + "go.opentelemetry.io/otel/attribute" "golang.zx2c4.com/wireguard/tun/netstack" "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet" ) @@ -28,6 +34,60 @@ type ProxyManager struct { udpConns []*gonet.UDPConn running bool mutex sync.RWMutex + + // telemetry (multi-tunnel) + currentTunnelID string + tunnels map[string]*tunnelEntry + asyncBytes bool + flushStop chan struct{} +} + +// tunnelEntry holds per-tunnel attributes and (optional) async counters. +type tunnelEntry struct { + attrInTCP attribute.Set + attrOutTCP attribute.Set + attrInUDP attribute.Set + attrOutUDP attribute.Set + + bytesInTCP atomic.Uint64 + bytesOutTCP atomic.Uint64 + bytesInUDP atomic.Uint64 + bytesOutUDP atomic.Uint64 +} + +// countingWriter wraps an io.Writer and adds bytes to OTel counter using a pre-built attribute set. +type countingWriter struct { + ctx context.Context + w io.Writer + set attribute.Set + pm *ProxyManager + ent *tunnelEntry + out bool // false=in, true=out + proto string // "tcp" or "udp" +} + +func (cw *countingWriter) Write(p []byte) (int, error) { + n, err := cw.w.Write(p) + if n > 0 { + if cw.pm != nil && cw.pm.asyncBytes && cw.ent != nil { + if cw.proto == "tcp" { + if cw.out { + cw.ent.bytesOutTCP.Add(uint64(n)) + } else { + cw.ent.bytesInTCP.Add(uint64(n)) + } + } else if cw.proto == "udp" { + if cw.out { + cw.ent.bytesOutUDP.Add(uint64(n)) + } else { + cw.ent.bytesInUDP.Add(uint64(n)) + } + } + } else { + telemetry.AddTunnelBytesSet(cw.ctx, int64(n), cw.set) + } + } + return n, err } // NewProxyManager creates a new proxy manager instance @@ -38,9 +98,56 @@ func NewProxyManager(tnet *netstack.Net) *ProxyManager { udpTargets: make(map[string]map[int]string), listeners: make([]*gonet.TCPListener, 0), udpConns: make([]*gonet.UDPConn, 0), + tunnels: make(map[string]*tunnelEntry), } } +// SetTunnelID sets the WireGuard peer public key used as tunnel_id label. +func (pm *ProxyManager) SetTunnelID(id string) { + pm.mutex.Lock() + defer pm.mutex.Unlock() + pm.currentTunnelID = id + if _, ok := pm.tunnels[id]; !ok { + pm.tunnels[id] = &tunnelEntry{} + } + e := pm.tunnels[id] + e.attrInTCP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "in"), attribute.String("protocol", "tcp")) + e.attrOutTCP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "out"), attribute.String("protocol", "tcp")) + e.attrInUDP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "in"), attribute.String("protocol", "udp")) + e.attrOutUDP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "out"), attribute.String("protocol", "udp")) +} + +// ClearTunnelID clears cached attribute sets for the current tunnel. +func (pm *ProxyManager) ClearTunnelID() { + pm.mutex.Lock() + defer pm.mutex.Unlock() + id := pm.currentTunnelID + if id == "" { + return + } + if e, ok := pm.tunnels[id]; ok { + // final flush for this tunnel + inTCP := e.bytesInTCP.Swap(0) + outTCP := e.bytesOutTCP.Swap(0) + inUDP := e.bytesInUDP.Swap(0) + outUDP := e.bytesOutUDP.Swap(0) + if inTCP > 0 { + telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) + } + if outTCP > 0 { + telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) + } + if inUDP > 0 { + telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) + } + if outUDP > 0 { + telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) + } + delete(pm.tunnels, id) + } + pm.currentTunnelID = "" +} + // init function without tnet func NewProxyManagerWithoutTNet() *ProxyManager { return &ProxyManager{ @@ -160,6 +267,75 @@ func (pm *ProxyManager) Start() error { return nil } +func (pm *ProxyManager) SetAsyncBytes(b bool) { + pm.mutex.Lock() + defer pm.mutex.Unlock() + pm.asyncBytes = b + if b && pm.flushStop == nil { + pm.flushStop = make(chan struct{}) + go pm.flushLoop() + } +} +func (pm *ProxyManager) flushLoop() { + flushInterval := 2 * time.Second + if v := os.Getenv("OTEL_METRIC_EXPORT_INTERVAL"); v != "" { + if d, err := time.ParseDuration(v); err == nil && d > 0 { + if d/2 < flushInterval { + flushInterval = d / 2 + } + } + } + ticker := time.NewTicker(flushInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + pm.mutex.RLock() + for _, e := range pm.tunnels { + inTCP := e.bytesInTCP.Swap(0) + outTCP := e.bytesOutTCP.Swap(0) + inUDP := e.bytesInUDP.Swap(0) + outUDP := e.bytesOutUDP.Swap(0) + if inTCP > 0 { + telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) + } + if outTCP > 0 { + telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) + } + if inUDP > 0 { + telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) + } + if outUDP > 0 { + telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) + } + } + pm.mutex.RUnlock() + case <-pm.flushStop: + pm.mutex.RLock() + for _, e := range pm.tunnels { + inTCP := e.bytesInTCP.Swap(0) + outTCP := e.bytesOutTCP.Swap(0) + inUDP := e.bytesInUDP.Swap(0) + outUDP := e.bytesOutUDP.Swap(0) + if inTCP > 0 { + telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP) + } + if outTCP > 0 { + telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP) + } + if inUDP > 0 { + telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP) + } + if outUDP > 0 { + telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP) + } + } + pm.mutex.RUnlock() + return + } + } +} + func (pm *ProxyManager) Stop() error { pm.mutex.Lock() defer pm.mutex.Unlock() @@ -236,6 +412,14 @@ func (pm *ProxyManager) startTarget(proto, listenIP string, port int, targetAddr return nil } +// getEntry returns per-tunnel entry or nil. +func (pm *ProxyManager) getEntry(id string) *tunnelEntry { + pm.mutex.RLock() + e := pm.tunnels[id] + pm.mutex.RUnlock() + return e +} + func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) { for { conn, err := listener.Accept() @@ -257,6 +441,11 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) continue } + // Count sessions only once per accepted TCP connection + if pm.currentTunnelID != "" { + state.Global().IncSessions(pm.currentTunnelID) + } + go func() { target, err := net.Dial("tcp", targetAddr) if err != nil { @@ -265,24 +454,35 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) return } + // already incremented on accept + // Create a WaitGroup to ensure both copy operations complete var wg sync.WaitGroup wg.Add(2) + // client -> target (direction=in) go func() { defer wg.Done() - io.Copy(target, conn) - target.Close() + e := pm.getEntry(pm.currentTunnelID) + cw := &countingWriter{ctx: context.Background(), w: target, set: e.attrInTCP, pm: pm, ent: e, out: false, proto: "tcp"} + _, _ = io.Copy(cw, conn) + _ = target.Close() }() + // target -> client (direction=out) go func() { defer wg.Done() - io.Copy(conn, target) - conn.Close() + e := pm.getEntry(pm.currentTunnelID) + cw := &countingWriter{ctx: context.Background(), w: conn, set: e.attrOutTCP, pm: pm, ent: e, out: true, proto: "tcp"} + _, _ = io.Copy(cw, target) + _ = conn.Close() }() - // Wait for both copies to complete + // Wait for both copies to complete then session -1 wg.Wait() + if pm.currentTunnelID != "" { + state.Global().DecSessions(pm.currentTunnelID) + } }() } } @@ -326,6 +526,18 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { } clientKey := remoteAddr.String() + // bytes from client -> target (direction=in) + if pm.currentTunnelID != "" && n > 0 { + if pm.asyncBytes { + if e := pm.getEntry(pm.currentTunnelID); e != nil { + e.bytesInUDP.Add(uint64(n)) + } + } else { + if e := pm.getEntry(pm.currentTunnelID); e != nil { + telemetry.AddTunnelBytesSet(context.Background(), int64(n), e.attrInUDP) + } + } + } clientsMutex.RLock() targetConn, exists := clientConns[clientKey] clientsMutex.RUnlock() @@ -366,6 +578,19 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { return // defer will handle cleanup } + // bytes from target -> client (direction=out) + if pm.currentTunnelID != "" && n > 0 { + if pm.asyncBytes { + if e := pm.getEntry(pm.currentTunnelID); e != nil { + e.bytesOutUDP.Add(uint64(n)) + } + } else { + if e := pm.getEntry(pm.currentTunnelID); e != nil { + telemetry.AddTunnelBytesSet(context.Background(), int64(n), e.attrOutUDP) + } + } + } + _, err = conn.WriteTo(buffer[:n], remoteAddr) if err != nil { logger.Error("Error writing to client: %v", err) @@ -375,13 +600,23 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { }(clientKey, targetConn, remoteAddr) } - _, err = targetConn.Write(buffer[:n]) + written, err := targetConn.Write(buffer[:n]) if err != nil { logger.Error("Error writing to target: %v", err) targetConn.Close() clientsMutex.Lock() delete(clientConns, clientKey) clientsMutex.Unlock() + } else if pm.currentTunnelID != "" && written > 0 { + if pm.asyncBytes { + if e := pm.getEntry(pm.currentTunnelID); e != nil { + e.bytesInUDP.Add(uint64(written)) + } + } else { + if e := pm.getEntry(pm.currentTunnelID); e != nil { + telemetry.AddTunnelBytesSet(context.Background(), int64(written), e.attrInUDP) + } + } } } }