mirror of
https://github.com/fosrl/newt.git
synced 2026-04-14 22:06:37 +00:00
Merge pull request #306 from LaurenceJJones/investigate/heap-leak-udp-proxy
fix(proxy): reclaim idle UDP flows and make timeout configurable
This commit is contained in:
16
main.go
16
main.go
@@ -129,6 +129,7 @@ var (
|
|||||||
dockerEnforceNetworkValidationBool bool
|
dockerEnforceNetworkValidationBool bool
|
||||||
pingInterval time.Duration
|
pingInterval time.Duration
|
||||||
pingTimeout time.Duration
|
pingTimeout time.Duration
|
||||||
|
udpProxyIdleTimeout time.Duration
|
||||||
publicKey wgtypes.Key
|
publicKey wgtypes.Key
|
||||||
pingStopChan chan struct{}
|
pingStopChan chan struct{}
|
||||||
stopFunc func()
|
stopFunc func()
|
||||||
@@ -261,6 +262,7 @@ func runNewtMain(ctx context.Context) {
|
|||||||
dockerSocket = os.Getenv("DOCKER_SOCKET")
|
dockerSocket = os.Getenv("DOCKER_SOCKET")
|
||||||
pingIntervalStr := os.Getenv("PING_INTERVAL")
|
pingIntervalStr := os.Getenv("PING_INTERVAL")
|
||||||
pingTimeoutStr := os.Getenv("PING_TIMEOUT")
|
pingTimeoutStr := os.Getenv("PING_TIMEOUT")
|
||||||
|
udpProxyIdleTimeoutStr := os.Getenv("NEWT_UDP_PROXY_IDLE_TIMEOUT")
|
||||||
dockerEnforceNetworkValidation = os.Getenv("DOCKER_ENFORCE_NETWORK_VALIDATION")
|
dockerEnforceNetworkValidation = os.Getenv("DOCKER_ENFORCE_NETWORK_VALIDATION")
|
||||||
healthFile = os.Getenv("HEALTH_FILE")
|
healthFile = os.Getenv("HEALTH_FILE")
|
||||||
// authorizedKeysFile = os.Getenv("AUTHORIZED_KEYS_FILE")
|
// authorizedKeysFile = os.Getenv("AUTHORIZED_KEYS_FILE")
|
||||||
@@ -337,6 +339,9 @@ func runNewtMain(ctx context.Context) {
|
|||||||
if pingTimeoutStr == "" {
|
if pingTimeoutStr == "" {
|
||||||
flag.StringVar(&pingTimeoutStr, "ping-timeout", "7s", " Timeout for each ping (default 7s)")
|
flag.StringVar(&pingTimeoutStr, "ping-timeout", "7s", " Timeout for each ping (default 7s)")
|
||||||
}
|
}
|
||||||
|
if udpProxyIdleTimeoutStr == "" {
|
||||||
|
flag.StringVar(&udpProxyIdleTimeoutStr, "udp-proxy-idle-timeout", "90s", "Idle timeout for UDP proxied client flows before cleanup")
|
||||||
|
}
|
||||||
// load the prefer endpoint just as a flag
|
// load the prefer endpoint just as a flag
|
||||||
flag.StringVar(&preferEndpoint, "prefer-endpoint", "", "Prefer this endpoint for the connection (if set, will override the endpoint from the server)")
|
flag.StringVar(&preferEndpoint, "prefer-endpoint", "", "Prefer this endpoint for the connection (if set, will override the endpoint from the server)")
|
||||||
if provisioningKey == "" {
|
if provisioningKey == "" {
|
||||||
@@ -386,6 +391,16 @@ func runNewtMain(ctx context.Context) {
|
|||||||
pingTimeout = 7 * time.Second
|
pingTimeout = 7 * time.Second
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if udpProxyIdleTimeoutStr != "" {
|
||||||
|
udpProxyIdleTimeout, err = time.ParseDuration(udpProxyIdleTimeoutStr)
|
||||||
|
if err != nil || udpProxyIdleTimeout <= 0 {
|
||||||
|
fmt.Printf("Invalid NEWT_UDP_PROXY_IDLE_TIMEOUT/--udp-proxy-idle-timeout value: %s, using default 90 seconds\n", udpProxyIdleTimeoutStr)
|
||||||
|
udpProxyIdleTimeout = 90 * time.Second
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
udpProxyIdleTimeout = 90 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
if dockerEnforceNetworkValidation == "" {
|
if dockerEnforceNetworkValidation == "" {
|
||||||
flag.StringVar(&dockerEnforceNetworkValidation, "docker-enforce-network-validation", "false", "Enforce validation of container on newt network (true or false)")
|
flag.StringVar(&dockerEnforceNetworkValidation, "docker-enforce-network-validation", "false", "Enforce validation of container on newt network (true or false)")
|
||||||
}
|
}
|
||||||
@@ -896,6 +911,7 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey(
|
|||||||
// Create proxy manager
|
// Create proxy manager
|
||||||
pm = proxy.NewProxyManager(tnet)
|
pm = proxy.NewProxyManager(tnet)
|
||||||
pm.SetAsyncBytes(metricsAsyncBytes)
|
pm.SetAsyncBytes(metricsAsyncBytes)
|
||||||
|
pm.SetUDPIdleTimeout(udpProxyIdleTimeout)
|
||||||
// Set tunnel_id for metrics (WireGuard peer public key)
|
// Set tunnel_id for metrics (WireGuard peer public key)
|
||||||
pm.SetTunnelID(wgData.PublicKey)
|
pm.SetTunnelID(wgData.PublicKey)
|
||||||
|
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import (
|
|||||||
const (
|
const (
|
||||||
errUnsupportedProtoFmt = "unsupported protocol: %s"
|
errUnsupportedProtoFmt = "unsupported protocol: %s"
|
||||||
maxUDPPacketSize = 65507 // Maximum UDP packet size
|
maxUDPPacketSize = 65507 // Maximum UDP packet size
|
||||||
|
defaultUDPIdleTimeout = 90 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// udpBufferPool provides reusable buffers for UDP packet handling.
|
// udpBufferPool provides reusable buffers for UDP packet handling.
|
||||||
@@ -68,6 +69,7 @@ type ProxyManager struct {
|
|||||||
tunnels map[string]*tunnelEntry
|
tunnels map[string]*tunnelEntry
|
||||||
asyncBytes bool
|
asyncBytes bool
|
||||||
flushStop chan struct{}
|
flushStop chan struct{}
|
||||||
|
udpIdleTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// tunnelEntry holds per-tunnel attributes and (optional) async counters.
|
// tunnelEntry holds per-tunnel attributes and (optional) async counters.
|
||||||
@@ -153,6 +155,7 @@ func NewProxyManager(tnet *netstack.Net) *ProxyManager {
|
|||||||
listeners: make([]*gonet.TCPListener, 0),
|
listeners: make([]*gonet.TCPListener, 0),
|
||||||
udpConns: make([]*gonet.UDPConn, 0),
|
udpConns: make([]*gonet.UDPConn, 0),
|
||||||
tunnels: make(map[string]*tunnelEntry),
|
tunnels: make(map[string]*tunnelEntry),
|
||||||
|
udpIdleTimeout: defaultUDPIdleTimeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -230,6 +233,7 @@ func NewProxyManagerWithoutTNet() *ProxyManager {
|
|||||||
udpTargets: make(map[string]map[int]string),
|
udpTargets: make(map[string]map[int]string),
|
||||||
listeners: make([]*gonet.TCPListener, 0),
|
listeners: make([]*gonet.TCPListener, 0),
|
||||||
udpConns: make([]*gonet.UDPConn, 0),
|
udpConns: make([]*gonet.UDPConn, 0),
|
||||||
|
udpIdleTimeout: defaultUDPIdleTimeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -366,6 +370,17 @@ func (pm *ProxyManager) SetAsyncBytes(b bool) {
|
|||||||
go pm.flushLoop()
|
go pm.flushLoop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetUDPIdleTimeout configures when idle UDP client flows are reclaimed.
|
||||||
|
func (pm *ProxyManager) SetUDPIdleTimeout(d time.Duration) {
|
||||||
|
pm.mutex.Lock()
|
||||||
|
defer pm.mutex.Unlock()
|
||||||
|
if d <= 0 {
|
||||||
|
pm.udpIdleTimeout = defaultUDPIdleTimeout
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pm.udpIdleTimeout = d
|
||||||
|
}
|
||||||
func (pm *ProxyManager) flushLoop() {
|
func (pm *ProxyManager) flushLoop() {
|
||||||
flushInterval := 2 * time.Second
|
flushInterval := 2 * time.Second
|
||||||
if v := os.Getenv("OTEL_METRIC_EXPORT_INTERVAL"); v != "" {
|
if v := os.Getenv("OTEL_METRIC_EXPORT_INTERVAL"); v != "" {
|
||||||
@@ -646,6 +661,9 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
|
|||||||
telemetry.IncProxyAccept(context.Background(), pm.currentTunnelID, "udp", "failure", classifyProxyError(err))
|
telemetry.IncProxyAccept(context.Background(), pm.currentTunnelID, "udp", "failure", classifyProxyError(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// Prevent idle UDP client goroutines from living forever and
|
||||||
|
// retaining large per-connection buffers.
|
||||||
|
_ = targetConn.SetReadDeadline(time.Now().Add(pm.udpIdleTimeout))
|
||||||
tunnelID := pm.currentTunnelID
|
tunnelID := pm.currentTunnelID
|
||||||
telemetry.IncProxyAccept(context.Background(), tunnelID, "udp", "success", "")
|
telemetry.IncProxyAccept(context.Background(), tunnelID, "udp", "success", "")
|
||||||
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionOpened)
|
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionOpened)
|
||||||
@@ -683,6 +701,10 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
|
|||||||
for {
|
for {
|
||||||
n, _, err := targetConn.ReadFromUDP(buffer)
|
n, _, err := targetConn.ReadFromUDP(buffer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
var netErr net.Error
|
||||||
|
if errors.As(err, &netErr) && netErr.Timeout() {
|
||||||
|
return
|
||||||
|
}
|
||||||
// Connection closed is normal during cleanup
|
// Connection closed is normal during cleanup
|
||||||
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
|
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
|
||||||
return // defer will handle cleanup, result stays "success"
|
return // defer will handle cleanup, result stays "success"
|
||||||
@@ -725,6 +747,8 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
|
|||||||
delete(clientConns, clientKey)
|
delete(clientConns, clientKey)
|
||||||
clientsMutex.Unlock()
|
clientsMutex.Unlock()
|
||||||
} else if pm.currentTunnelID != "" && written > 0 {
|
} else if pm.currentTunnelID != "" && written > 0 {
|
||||||
|
// Extend idle timeout whenever client traffic is observed.
|
||||||
|
_ = targetConn.SetReadDeadline(time.Now().Add(pm.udpIdleTimeout))
|
||||||
if pm.asyncBytes {
|
if pm.asyncBytes {
|
||||||
if e := pm.getEntry(pm.currentTunnelID); e != nil {
|
if e := pm.getEntry(pm.currentTunnelID); e != nil {
|
||||||
e.bytesInUDP.Add(uint64(written))
|
e.bytesInUDP.Add(uint64(written))
|
||||||
|
|||||||
Reference in New Issue
Block a user