From 31f899588f0406d9e1b2b857952b1ee7b7331fe5 Mon Sep 17 00:00:00 2001 From: Laurence Date: Thu, 9 Apr 2026 15:45:55 +0100 Subject: [PATCH] fix(proxy): reclaim idle UDP flows and make timeout configurable --- main.go | 16 ++++++++++++++++ proxy/manager.go | 24 ++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/main.go b/main.go index d5f2a96..7718c5d 100644 --- a/main.go +++ b/main.go @@ -129,6 +129,7 @@ var ( dockerEnforceNetworkValidationBool bool pingInterval time.Duration pingTimeout time.Duration + udpProxyIdleTimeout time.Duration publicKey wgtypes.Key pingStopChan chan struct{} stopFunc func() @@ -261,6 +262,7 @@ func runNewtMain(ctx context.Context) { dockerSocket = os.Getenv("DOCKER_SOCKET") pingIntervalStr := os.Getenv("PING_INTERVAL") pingTimeoutStr := os.Getenv("PING_TIMEOUT") + udpProxyIdleTimeoutStr := os.Getenv("NEWT_UDP_PROXY_IDLE_TIMEOUT") dockerEnforceNetworkValidation = os.Getenv("DOCKER_ENFORCE_NETWORK_VALIDATION") healthFile = os.Getenv("HEALTH_FILE") // authorizedKeysFile = os.Getenv("AUTHORIZED_KEYS_FILE") @@ -337,6 +339,9 @@ func runNewtMain(ctx context.Context) { if pingTimeoutStr == "" { flag.StringVar(&pingTimeoutStr, "ping-timeout", "7s", " Timeout for each ping (default 7s)") } + if udpProxyIdleTimeoutStr == "" { + flag.StringVar(&udpProxyIdleTimeoutStr, "udp-proxy-idle-timeout", "90s", "Idle timeout for UDP proxied client flows before cleanup") + } // load the prefer endpoint just as a flag flag.StringVar(&preferEndpoint, "prefer-endpoint", "", "Prefer this endpoint for the connection (if set, will override the endpoint from the server)") if provisioningKey == "" { @@ -386,6 +391,16 @@ func runNewtMain(ctx context.Context) { pingTimeout = 7 * time.Second } + if udpProxyIdleTimeoutStr != "" { + udpProxyIdleTimeout, err = time.ParseDuration(udpProxyIdleTimeoutStr) + if err != nil || udpProxyIdleTimeout <= 0 { + fmt.Printf("Invalid NEWT_UDP_PROXY_IDLE_TIMEOUT/--udp-proxy-idle-timeout value: %s, using default 90 seconds\n", udpProxyIdleTimeoutStr) + udpProxyIdleTimeout = 90 * time.Second + } + } else { + udpProxyIdleTimeout = 90 * time.Second + } + if dockerEnforceNetworkValidation == "" { flag.StringVar(&dockerEnforceNetworkValidation, "docker-enforce-network-validation", "false", "Enforce validation of container on newt network (true or false)") } @@ -896,6 +911,7 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey( // Create proxy manager pm = proxy.NewProxyManager(tnet) pm.SetAsyncBytes(metricsAsyncBytes) + pm.SetUDPIdleTimeout(udpProxyIdleTimeout) // Set tunnel_id for metrics (WireGuard peer public key) pm.SetTunnelID(wgData.PublicKey) diff --git a/proxy/manager.go b/proxy/manager.go index 5566589..d02956c 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -24,6 +24,7 @@ import ( const ( errUnsupportedProtoFmt = "unsupported protocol: %s" maxUDPPacketSize = 65507 + defaultUDPIdleTimeout = 90 * time.Second ) // Target represents a proxy target with its address and port @@ -47,6 +48,7 @@ type ProxyManager struct { tunnels map[string]*tunnelEntry asyncBytes bool flushStop chan struct{} + udpIdleTimeout time.Duration } // tunnelEntry holds per-tunnel attributes and (optional) async counters. @@ -132,6 +134,7 @@ func NewProxyManager(tnet *netstack.Net) *ProxyManager { listeners: make([]*gonet.TCPListener, 0), udpConns: make([]*gonet.UDPConn, 0), tunnels: make(map[string]*tunnelEntry), + udpIdleTimeout: defaultUDPIdleTimeout, } } @@ -209,6 +212,7 @@ func NewProxyManagerWithoutTNet() *ProxyManager { udpTargets: make(map[string]map[int]string), listeners: make([]*gonet.TCPListener, 0), udpConns: make([]*gonet.UDPConn, 0), + udpIdleTimeout: defaultUDPIdleTimeout, } } @@ -345,6 +349,17 @@ func (pm *ProxyManager) SetAsyncBytes(b bool) { go pm.flushLoop() } } + +// SetUDPIdleTimeout configures when idle UDP client flows are reclaimed. +func (pm *ProxyManager) SetUDPIdleTimeout(d time.Duration) { + pm.mutex.Lock() + defer pm.mutex.Unlock() + if d <= 0 { + pm.udpIdleTimeout = defaultUDPIdleTimeout + return + } + pm.udpIdleTimeout = d +} func (pm *ProxyManager) flushLoop() { flushInterval := 2 * time.Second if v := os.Getenv("OTEL_METRIC_EXPORT_INTERVAL"); v != "" { @@ -623,6 +638,9 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { telemetry.IncProxyAccept(context.Background(), pm.currentTunnelID, "udp", "failure", classifyProxyError(err)) continue } + // Prevent idle UDP client goroutines from living forever and + // retaining large per-connection buffers. + _ = targetConn.SetReadDeadline(time.Now().Add(pm.udpIdleTimeout)) tunnelID := pm.currentTunnelID telemetry.IncProxyAccept(context.Background(), tunnelID, "udp", "success", "") telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionOpened) @@ -657,6 +675,10 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { for { n, _, err := targetConn.ReadFromUDP(buffer) if err != nil { + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return + } // Connection closed is normal during cleanup if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) { return // defer will handle cleanup, result stays "success" @@ -699,6 +721,8 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { delete(clientConns, clientKey) clientsMutex.Unlock() } else if pm.currentTunnelID != "" && written > 0 { + // Extend idle timeout whenever client traffic is observed. + _ = targetConn.SetReadDeadline(time.Now().Add(pm.udpIdleTimeout)) if pm.asyncBytes { if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesInUDP.Add(uint64(written))