fix(proxy): reclaim idle UDP flows and make timeout configurable

This commit is contained in:
Laurence
2026-04-09 15:45:55 +01:00
parent d7c3c38d24
commit 31f899588f
2 changed files with 40 additions and 0 deletions

16
main.go
View File

@@ -129,6 +129,7 @@ var (
dockerEnforceNetworkValidationBool bool
pingInterval time.Duration
pingTimeout time.Duration
udpProxyIdleTimeout time.Duration
publicKey wgtypes.Key
pingStopChan chan struct{}
stopFunc func()
@@ -261,6 +262,7 @@ func runNewtMain(ctx context.Context) {
dockerSocket = os.Getenv("DOCKER_SOCKET")
pingIntervalStr := os.Getenv("PING_INTERVAL")
pingTimeoutStr := os.Getenv("PING_TIMEOUT")
udpProxyIdleTimeoutStr := os.Getenv("NEWT_UDP_PROXY_IDLE_TIMEOUT")
dockerEnforceNetworkValidation = os.Getenv("DOCKER_ENFORCE_NETWORK_VALIDATION")
healthFile = os.Getenv("HEALTH_FILE")
// authorizedKeysFile = os.Getenv("AUTHORIZED_KEYS_FILE")
@@ -337,6 +339,9 @@ func runNewtMain(ctx context.Context) {
if pingTimeoutStr == "" {
flag.StringVar(&pingTimeoutStr, "ping-timeout", "7s", " Timeout for each ping (default 7s)")
}
if udpProxyIdleTimeoutStr == "" {
flag.StringVar(&udpProxyIdleTimeoutStr, "udp-proxy-idle-timeout", "90s", "Idle timeout for UDP proxied client flows before cleanup")
}
// load the prefer endpoint just as a flag
flag.StringVar(&preferEndpoint, "prefer-endpoint", "", "Prefer this endpoint for the connection (if set, will override the endpoint from the server)")
if provisioningKey == "" {
@@ -386,6 +391,16 @@ func runNewtMain(ctx context.Context) {
pingTimeout = 7 * time.Second
}
if udpProxyIdleTimeoutStr != "" {
udpProxyIdleTimeout, err = time.ParseDuration(udpProxyIdleTimeoutStr)
if err != nil || udpProxyIdleTimeout <= 0 {
fmt.Printf("Invalid NEWT_UDP_PROXY_IDLE_TIMEOUT/--udp-proxy-idle-timeout value: %s, using default 90 seconds\n", udpProxyIdleTimeoutStr)
udpProxyIdleTimeout = 90 * time.Second
}
} else {
udpProxyIdleTimeout = 90 * time.Second
}
if dockerEnforceNetworkValidation == "" {
flag.StringVar(&dockerEnforceNetworkValidation, "docker-enforce-network-validation", "false", "Enforce validation of container on newt network (true or false)")
}
@@ -896,6 +911,7 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey(
// Create proxy manager
pm = proxy.NewProxyManager(tnet)
pm.SetAsyncBytes(metricsAsyncBytes)
pm.SetUDPIdleTimeout(udpProxyIdleTimeout)
// Set tunnel_id for metrics (WireGuard peer public key)
pm.SetTunnelID(wgData.PublicKey)

View File

@@ -24,6 +24,7 @@ import (
const (
errUnsupportedProtoFmt = "unsupported protocol: %s"
maxUDPPacketSize = 65507
defaultUDPIdleTimeout = 90 * time.Second
)
// Target represents a proxy target with its address and port
@@ -47,6 +48,7 @@ type ProxyManager struct {
tunnels map[string]*tunnelEntry
asyncBytes bool
flushStop chan struct{}
udpIdleTimeout time.Duration
}
// tunnelEntry holds per-tunnel attributes and (optional) async counters.
@@ -132,6 +134,7 @@ func NewProxyManager(tnet *netstack.Net) *ProxyManager {
listeners: make([]*gonet.TCPListener, 0),
udpConns: make([]*gonet.UDPConn, 0),
tunnels: make(map[string]*tunnelEntry),
udpIdleTimeout: defaultUDPIdleTimeout,
}
}
@@ -209,6 +212,7 @@ func NewProxyManagerWithoutTNet() *ProxyManager {
udpTargets: make(map[string]map[int]string),
listeners: make([]*gonet.TCPListener, 0),
udpConns: make([]*gonet.UDPConn, 0),
udpIdleTimeout: defaultUDPIdleTimeout,
}
}
@@ -345,6 +349,17 @@ func (pm *ProxyManager) SetAsyncBytes(b bool) {
go pm.flushLoop()
}
}
// SetUDPIdleTimeout configures when idle UDP client flows are reclaimed.
func (pm *ProxyManager) SetUDPIdleTimeout(d time.Duration) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
if d <= 0 {
pm.udpIdleTimeout = defaultUDPIdleTimeout
return
}
pm.udpIdleTimeout = d
}
func (pm *ProxyManager) flushLoop() {
flushInterval := 2 * time.Second
if v := os.Getenv("OTEL_METRIC_EXPORT_INTERVAL"); v != "" {
@@ -623,6 +638,9 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
telemetry.IncProxyAccept(context.Background(), pm.currentTunnelID, "udp", "failure", classifyProxyError(err))
continue
}
// Prevent idle UDP client goroutines from living forever and
// retaining large per-connection buffers.
_ = targetConn.SetReadDeadline(time.Now().Add(pm.udpIdleTimeout))
tunnelID := pm.currentTunnelID
telemetry.IncProxyAccept(context.Background(), tunnelID, "udp", "success", "")
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionOpened)
@@ -657,6 +675,10 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
for {
n, _, err := targetConn.ReadFromUDP(buffer)
if err != nil {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
return
}
// Connection closed is normal during cleanup
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
return // defer will handle cleanup, result stays "success"
@@ -699,6 +721,8 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
delete(clientConns, clientKey)
clientsMutex.Unlock()
} else if pm.currentTunnelID != "" && written > 0 {
// Extend idle timeout whenever client traffic is observed.
_ = targetConn.SetReadDeadline(time.Now().Add(pm.udpIdleTimeout))
if pm.asyncBytes {
if e := pm.getEntry(pm.currentTunnelID); e != nil {
e.bytesInUDP.Add(uint64(written))