Refactor ProxyManager for per-tunnel metrics, async bytes collection, and session counting

This commit is contained in:
Marc Schäfer
2025-10-07 09:13:03 +02:00
parent 496ff0734c
commit ceef228665

View File

@@ -1,14 +1,20 @@
package proxy
import (
"context"
"fmt"
"io"
"net"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/fosrl/newt/internal/state"
"github.com/fosrl/newt/internal/telemetry"
"github.com/fosrl/newt/logger"
"go.opentelemetry.io/otel/attribute"
"golang.zx2c4.com/wireguard/tun/netstack"
"gvisor.dev/gvisor/pkg/tcpip/adapters/gonet"
)
@@ -28,6 +34,60 @@ type ProxyManager struct {
udpConns []*gonet.UDPConn
running bool
mutex sync.RWMutex
// telemetry (multi-tunnel)
currentTunnelID string
tunnels map[string]*tunnelEntry
asyncBytes bool
flushStop chan struct{}
}
// tunnelEntry holds per-tunnel attributes and (optional) async counters.
type tunnelEntry struct {
attrInTCP attribute.Set
attrOutTCP attribute.Set
attrInUDP attribute.Set
attrOutUDP attribute.Set
bytesInTCP atomic.Uint64
bytesOutTCP atomic.Uint64
bytesInUDP atomic.Uint64
bytesOutUDP atomic.Uint64
}
// countingWriter wraps an io.Writer and adds bytes to OTel counter using a pre-built attribute set.
type countingWriter struct {
ctx context.Context
w io.Writer
set attribute.Set
pm *ProxyManager
ent *tunnelEntry
out bool // false=in, true=out
proto string // "tcp" or "udp"
}
func (cw *countingWriter) Write(p []byte) (int, error) {
n, err := cw.w.Write(p)
if n > 0 {
if cw.pm != nil && cw.pm.asyncBytes && cw.ent != nil {
if cw.proto == "tcp" {
if cw.out {
cw.ent.bytesOutTCP.Add(uint64(n))
} else {
cw.ent.bytesInTCP.Add(uint64(n))
}
} else if cw.proto == "udp" {
if cw.out {
cw.ent.bytesOutUDP.Add(uint64(n))
} else {
cw.ent.bytesInUDP.Add(uint64(n))
}
}
} else {
telemetry.AddTunnelBytesSet(cw.ctx, int64(n), cw.set)
}
}
return n, err
}
// NewProxyManager creates a new proxy manager instance
@@ -38,9 +98,56 @@ func NewProxyManager(tnet *netstack.Net) *ProxyManager {
udpTargets: make(map[string]map[int]string),
listeners: make([]*gonet.TCPListener, 0),
udpConns: make([]*gonet.UDPConn, 0),
tunnels: make(map[string]*tunnelEntry),
}
}
// SetTunnelID sets the WireGuard peer public key used as tunnel_id label.
func (pm *ProxyManager) SetTunnelID(id string) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.currentTunnelID = id
if _, ok := pm.tunnels[id]; !ok {
pm.tunnels[id] = &tunnelEntry{}
}
e := pm.tunnels[id]
e.attrInTCP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "in"), attribute.String("protocol", "tcp"))
e.attrOutTCP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "out"), attribute.String("protocol", "tcp"))
e.attrInUDP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "in"), attribute.String("protocol", "udp"))
e.attrOutUDP = attribute.NewSet(attribute.String("tunnel_id", id), attribute.String("direction", "out"), attribute.String("protocol", "udp"))
}
// ClearTunnelID clears cached attribute sets for the current tunnel.
func (pm *ProxyManager) ClearTunnelID() {
pm.mutex.Lock()
defer pm.mutex.Unlock()
id := pm.currentTunnelID
if id == "" {
return
}
if e, ok := pm.tunnels[id]; ok {
// final flush for this tunnel
inTCP := e.bytesInTCP.Swap(0)
outTCP := e.bytesOutTCP.Swap(0)
inUDP := e.bytesInUDP.Swap(0)
outUDP := e.bytesOutUDP.Swap(0)
if inTCP > 0 {
telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP)
}
if outTCP > 0 {
telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP)
}
if inUDP > 0 {
telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP)
}
if outUDP > 0 {
telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP)
}
delete(pm.tunnels, id)
}
pm.currentTunnelID = ""
}
// init function without tnet
func NewProxyManagerWithoutTNet() *ProxyManager {
return &ProxyManager{
@@ -160,6 +267,75 @@ func (pm *ProxyManager) Start() error {
return nil
}
func (pm *ProxyManager) SetAsyncBytes(b bool) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.asyncBytes = b
if b && pm.flushStop == nil {
pm.flushStop = make(chan struct{})
go pm.flushLoop()
}
}
func (pm *ProxyManager) flushLoop() {
flushInterval := 2 * time.Second
if v := os.Getenv("OTEL_METRIC_EXPORT_INTERVAL"); v != "" {
if d, err := time.ParseDuration(v); err == nil && d > 0 {
if d/2 < flushInterval {
flushInterval = d / 2
}
}
}
ticker := time.NewTicker(flushInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
pm.mutex.RLock()
for _, e := range pm.tunnels {
inTCP := e.bytesInTCP.Swap(0)
outTCP := e.bytesOutTCP.Swap(0)
inUDP := e.bytesInUDP.Swap(0)
outUDP := e.bytesOutUDP.Swap(0)
if inTCP > 0 {
telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP)
}
if outTCP > 0 {
telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP)
}
if inUDP > 0 {
telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP)
}
if outUDP > 0 {
telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP)
}
}
pm.mutex.RUnlock()
case <-pm.flushStop:
pm.mutex.RLock()
for _, e := range pm.tunnels {
inTCP := e.bytesInTCP.Swap(0)
outTCP := e.bytesOutTCP.Swap(0)
inUDP := e.bytesInUDP.Swap(0)
outUDP := e.bytesOutUDP.Swap(0)
if inTCP > 0 {
telemetry.AddTunnelBytesSet(context.Background(), int64(inTCP), e.attrInTCP)
}
if outTCP > 0 {
telemetry.AddTunnelBytesSet(context.Background(), int64(outTCP), e.attrOutTCP)
}
if inUDP > 0 {
telemetry.AddTunnelBytesSet(context.Background(), int64(inUDP), e.attrInUDP)
}
if outUDP > 0 {
telemetry.AddTunnelBytesSet(context.Background(), int64(outUDP), e.attrOutUDP)
}
}
pm.mutex.RUnlock()
return
}
}
}
func (pm *ProxyManager) Stop() error {
pm.mutex.Lock()
defer pm.mutex.Unlock()
@@ -236,6 +412,14 @@ func (pm *ProxyManager) startTarget(proto, listenIP string, port int, targetAddr
return nil
}
// getEntry returns per-tunnel entry or nil.
func (pm *ProxyManager) getEntry(id string) *tunnelEntry {
pm.mutex.RLock()
e := pm.tunnels[id]
pm.mutex.RUnlock()
return e
}
func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) {
for {
conn, err := listener.Accept()
@@ -257,6 +441,11 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
continue
}
// Count sessions only once per accepted TCP connection
if pm.currentTunnelID != "" {
state.Global().IncSessions(pm.currentTunnelID)
}
go func() {
target, err := net.Dial("tcp", targetAddr)
if err != nil {
@@ -265,24 +454,35 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
return
}
// already incremented on accept
// Create a WaitGroup to ensure both copy operations complete
var wg sync.WaitGroup
wg.Add(2)
// client -> target (direction=in)
go func() {
defer wg.Done()
io.Copy(target, conn)
target.Close()
e := pm.getEntry(pm.currentTunnelID)
cw := &countingWriter{ctx: context.Background(), w: target, set: e.attrInTCP, pm: pm, ent: e, out: false, proto: "tcp"}
_, _ = io.Copy(cw, conn)
_ = target.Close()
}()
// target -> client (direction=out)
go func() {
defer wg.Done()
io.Copy(conn, target)
conn.Close()
e := pm.getEntry(pm.currentTunnelID)
cw := &countingWriter{ctx: context.Background(), w: conn, set: e.attrOutTCP, pm: pm, ent: e, out: true, proto: "tcp"}
_, _ = io.Copy(cw, target)
_ = conn.Close()
}()
// Wait for both copies to complete
// Wait for both copies to complete then session -1
wg.Wait()
if pm.currentTunnelID != "" {
state.Global().DecSessions(pm.currentTunnelID)
}
}()
}
}
@@ -326,6 +526,18 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
}
clientKey := remoteAddr.String()
// bytes from client -> target (direction=in)
if pm.currentTunnelID != "" && n > 0 {
if pm.asyncBytes {
if e := pm.getEntry(pm.currentTunnelID); e != nil {
e.bytesInUDP.Add(uint64(n))
}
} else {
if e := pm.getEntry(pm.currentTunnelID); e != nil {
telemetry.AddTunnelBytesSet(context.Background(), int64(n), e.attrInUDP)
}
}
}
clientsMutex.RLock()
targetConn, exists := clientConns[clientKey]
clientsMutex.RUnlock()
@@ -366,6 +578,19 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
return // defer will handle cleanup
}
// bytes from target -> client (direction=out)
if pm.currentTunnelID != "" && n > 0 {
if pm.asyncBytes {
if e := pm.getEntry(pm.currentTunnelID); e != nil {
e.bytesOutUDP.Add(uint64(n))
}
} else {
if e := pm.getEntry(pm.currentTunnelID); e != nil {
telemetry.AddTunnelBytesSet(context.Background(), int64(n), e.attrOutUDP)
}
}
}
_, err = conn.WriteTo(buffer[:n], remoteAddr)
if err != nil {
logger.Error("Error writing to client: %v", err)
@@ -375,13 +600,23 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
}(clientKey, targetConn, remoteAddr)
}
_, err = targetConn.Write(buffer[:n])
written, err := targetConn.Write(buffer[:n])
if err != nil {
logger.Error("Error writing to target: %v", err)
targetConn.Close()
clientsMutex.Lock()
delete(clientConns, clientKey)
clientsMutex.Unlock()
} else if pm.currentTunnelID != "" && written > 0 {
if pm.asyncBytes {
if e := pm.getEntry(pm.currentTunnelID); e != nil {
e.bytesInUDP.Add(uint64(written))
}
} else {
if e := pm.getEntry(pm.currentTunnelID); e != nil {
telemetry.AddTunnelBytesSet(context.Background(), int64(written), e.attrInUDP)
}
}
}
}
}