Compare commits

...

9 Commits

Author SHA1 Message Date
Owen Schwartz
dc2e23380a Merge pull request #306 from LaurenceJJones/investigate/heap-leak-udp-proxy
fix(proxy): reclaim idle UDP flows and make timeout configurable
2026-04-13 10:27:37 -07:00
Laurence
0569525743 Merge remote-tracking branch 'upstream/dev' into investigate/heap-leak-udp-proxy
Made-with: Cursor

# Conflicts:
#	proxy/manager.go
2026-04-10 13:36:13 +01:00
Owen Schwartz
6becf0f719 Merge pull request #277 from LaurenceJJones/refactor/proxy-udp-buffer-pool
perf(proxy): add sync.Pool for UDP buffers
2026-04-09 13:09:06 -04:00
Laurence
4d8d00241d perf(proxy): add sync.Pool for UDP buffers
- Add udpBufferPool for reusable 65507-byte UDP packet buffers
- Add getUDPBuffer() and putUDPBuffer() helper functions
- Clear buffer contents before returning to pool to prevent data leakage
- Apply pooling to both main handler buffer and per-client goroutine buffers
- Reduces GC pressure from frequent large allocations during UDP proxying

Made-with: Cursor
2026-04-09 15:59:03 +01:00
Laurence
31f899588f fix(proxy): reclaim idle UDP flows and make timeout configurable 2026-04-09 15:45:55 +01:00
Owen Schwartz
7e1e3408d5 Merge pull request #302 from LaurenceJJones/fix/config-file-provision-save
fix: allow empty config file bootstrap before provisioning
2026-04-08 21:58:07 -04:00
Laurence
d7c3c38d24 fix: allow empty config file bootstrap before provisioning
Treat an empty CONFIG_FILE as initial state instead of failing JSON parse, so provisioning can proceed and credentials can be saved. Ref: fosrl/pangolin#2812
2026-04-08 14:13:13 +01:00
Owen
27e471942e Add CODEOWNERS 2026-04-07 11:34:18 -04:00
Owen
184bfb12d6 Delete bad bp 2026-04-03 17:36:48 -04:00
6 changed files with 110 additions and 40 deletions

1
.github/CODEOWNERS vendored Normal file
View File

@@ -0,0 +1 @@
* @oschwartz10612 @miloschwartz

View File

@@ -1,37 +0,0 @@
resources:
resource-nice-id:
name: this is my resource
protocol: http
full-domain: level1.test3.example.com
host-header: example.com
tls-server-name: example.com
auth:
pincode: 123456
password: sadfasdfadsf
sso-enabled: true
sso-roles:
- Member
sso-users:
- owen@pangolin.net
whitelist-users:
- owen@pangolin.net
targets:
# - site: glossy-plains-viscacha-rat
- hostname: localhost
method: http
port: 8000
healthcheck:
port: 8000
hostname: localhost
# - site: glossy-plains-viscacha-rat
- hostname: localhost
method: http
port: 8001
resource-nice-id2:
name: this is other resource
protocol: tcp
proxy-port: 3000
targets:
# - site: glossy-plains-viscacha-rat
- hostname: localhost
port: 3000

16
main.go
View File

@@ -129,6 +129,7 @@ var (
dockerEnforceNetworkValidationBool bool
pingInterval time.Duration
pingTimeout time.Duration
udpProxyIdleTimeout time.Duration
publicKey wgtypes.Key
pingStopChan chan struct{}
stopFunc func()
@@ -261,6 +262,7 @@ func runNewtMain(ctx context.Context) {
dockerSocket = os.Getenv("DOCKER_SOCKET")
pingIntervalStr := os.Getenv("PING_INTERVAL")
pingTimeoutStr := os.Getenv("PING_TIMEOUT")
udpProxyIdleTimeoutStr := os.Getenv("NEWT_UDP_PROXY_IDLE_TIMEOUT")
dockerEnforceNetworkValidation = os.Getenv("DOCKER_ENFORCE_NETWORK_VALIDATION")
healthFile = os.Getenv("HEALTH_FILE")
// authorizedKeysFile = os.Getenv("AUTHORIZED_KEYS_FILE")
@@ -337,6 +339,9 @@ func runNewtMain(ctx context.Context) {
if pingTimeoutStr == "" {
flag.StringVar(&pingTimeoutStr, "ping-timeout", "7s", " Timeout for each ping (default 7s)")
}
if udpProxyIdleTimeoutStr == "" {
flag.StringVar(&udpProxyIdleTimeoutStr, "udp-proxy-idle-timeout", "90s", "Idle timeout for UDP proxied client flows before cleanup")
}
// load the prefer endpoint just as a flag
flag.StringVar(&preferEndpoint, "prefer-endpoint", "", "Prefer this endpoint for the connection (if set, will override the endpoint from the server)")
if provisioningKey == "" {
@@ -386,6 +391,16 @@ func runNewtMain(ctx context.Context) {
pingTimeout = 7 * time.Second
}
if udpProxyIdleTimeoutStr != "" {
udpProxyIdleTimeout, err = time.ParseDuration(udpProxyIdleTimeoutStr)
if err != nil || udpProxyIdleTimeout <= 0 {
fmt.Printf("Invalid NEWT_UDP_PROXY_IDLE_TIMEOUT/--udp-proxy-idle-timeout value: %s, using default 90 seconds\n", udpProxyIdleTimeoutStr)
udpProxyIdleTimeout = 90 * time.Second
}
} else {
udpProxyIdleTimeout = 90 * time.Second
}
if dockerEnforceNetworkValidation == "" {
flag.StringVar(&dockerEnforceNetworkValidation, "docker-enforce-network-validation", "false", "Enforce validation of container on newt network (true or false)")
}
@@ -896,6 +911,7 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey(
// Create proxy manager
pm = proxy.NewProxyManager(tnet)
pm.SetAsyncBytes(metricsAsyncBytes)
pm.SetUDPIdleTimeout(udpProxyIdleTimeout)
// Set tunnel_id for metrics (WireGuard peer public key)
pm.SetTunnelID(wgData.PublicKey)

View File

@@ -23,9 +23,31 @@ import (
const (
errUnsupportedProtoFmt = "unsupported protocol: %s"
maxUDPPacketSize = 65507
maxUDPPacketSize = 65507 // Maximum UDP packet size
defaultUDPIdleTimeout = 90 * time.Second
)
// udpBufferPool provides reusable buffers for UDP packet handling.
// This reduces GC pressure from frequent large allocations.
var udpBufferPool = sync.Pool{
New: func() any {
buf := make([]byte, maxUDPPacketSize)
return &buf
},
}
// getUDPBuffer retrieves a buffer from the pool.
func getUDPBuffer() *[]byte {
return udpBufferPool.Get().(*[]byte)
}
// putUDPBuffer clears and returns a buffer to the pool.
func putUDPBuffer(buf *[]byte) {
// Clear the buffer to prevent data leakage
clear(*buf)
udpBufferPool.Put(buf)
}
// Target represents a proxy target with its address and port
type Target struct {
Address string
@@ -47,6 +69,7 @@ type ProxyManager struct {
tunnels map[string]*tunnelEntry
asyncBytes bool
flushStop chan struct{}
udpIdleTimeout time.Duration
}
// tunnelEntry holds per-tunnel attributes and (optional) async counters.
@@ -132,6 +155,7 @@ func NewProxyManager(tnet *netstack.Net) *ProxyManager {
listeners: make([]*gonet.TCPListener, 0),
udpConns: make([]*gonet.UDPConn, 0),
tunnels: make(map[string]*tunnelEntry),
udpIdleTimeout: defaultUDPIdleTimeout,
}
}
@@ -209,6 +233,7 @@ func NewProxyManagerWithoutTNet() *ProxyManager {
udpTargets: make(map[string]map[int]string),
listeners: make([]*gonet.TCPListener, 0),
udpConns: make([]*gonet.UDPConn, 0),
udpIdleTimeout: defaultUDPIdleTimeout,
}
}
@@ -345,6 +370,17 @@ func (pm *ProxyManager) SetAsyncBytes(b bool) {
go pm.flushLoop()
}
}
// SetUDPIdleTimeout configures when idle UDP client flows are reclaimed.
func (pm *ProxyManager) SetUDPIdleTimeout(d time.Duration) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
if d <= 0 {
pm.udpIdleTimeout = defaultUDPIdleTimeout
return
}
pm.udpIdleTimeout = d
}
func (pm *ProxyManager) flushLoop() {
flushInterval := 2 * time.Second
if v := os.Getenv("OTEL_METRIC_EXPORT_INTERVAL"); v != "" {
@@ -555,7 +591,9 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
}
func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
buffer := make([]byte, maxUDPPacketSize) // Max UDP packet size
bufPtr := getUDPBuffer()
defer putUDPBuffer(bufPtr)
buffer := *bufPtr
clientConns := make(map[string]*net.UDPConn)
var clientsMutex sync.RWMutex
@@ -623,6 +661,9 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
telemetry.IncProxyAccept(context.Background(), pm.currentTunnelID, "udp", "failure", classifyProxyError(err))
continue
}
// Prevent idle UDP client goroutines from living forever and
// retaining large per-connection buffers.
_ = targetConn.SetReadDeadline(time.Now().Add(pm.udpIdleTimeout))
tunnelID := pm.currentTunnelID
telemetry.IncProxyAccept(context.Background(), tunnelID, "udp", "success", "")
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionOpened)
@@ -638,7 +679,10 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
go func(clientKey string, targetConn *net.UDPConn, remoteAddr net.Addr, tunnelID string) {
start := time.Now()
result := "success"
bufPtr := getUDPBuffer()
defer func() {
// Return buffer to pool first
putUDPBuffer(bufPtr)
// Always clean up when this goroutine exits
clientsMutex.Lock()
if storedConn, exists := clientConns[clientKey]; exists && storedConn == targetConn {
@@ -653,10 +697,14 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionClosed)
}()
buffer := make([]byte, maxUDPPacketSize)
buffer := *bufPtr
for {
n, _, err := targetConn.ReadFromUDP(buffer)
if err != nil {
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
return
}
// Connection closed is normal during cleanup
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
return // defer will handle cleanup, result stays "success"
@@ -699,6 +747,8 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
delete(clientConns, clientKey)
clientsMutex.Unlock()
} else if pm.currentTunnelID != "" && written > 0 {
// Extend idle timeout whenever client traffic is observed.
_ = targetConn.SetReadDeadline(time.Now().Add(pm.udpIdleTimeout))
if pm.asyncBytes {
if e := pm.getEntry(pm.currentTunnelID); e != nil {
e.bytesInUDP.Add(uint64(written))

View File

@@ -71,6 +71,11 @@ func (c *Client) loadConfig() error {
}
return err
}
if len(bytes.TrimSpace(data)) == 0 {
logger.Info("Config file at %s is empty, will initialize it with provided values", configPath)
c.configNeedsSave = true
return nil
}
var config Config
if err := json.Unmarshal(data, &config); err != nil {

35
websocket/config_test.go Normal file
View File

@@ -0,0 +1,35 @@
package websocket
import (
"os"
"path/filepath"
"testing"
)
func TestLoadConfig_EmptyFileMarksConfigForSave(t *testing.T) {
t.Setenv("CONFIG_FILE", "")
tmpDir := t.TempDir()
configPath := filepath.Join(tmpDir, "config.json")
if err := os.WriteFile(configPath, []byte(""), 0o644); err != nil {
t.Fatalf("failed to create empty config file: %v", err)
}
client := &Client{
config: &Config{
Endpoint: "https://example.com",
ProvisioningKey: "spk-test",
},
clientType: "newt",
configFilePath: configPath,
}
if err := client.loadConfig(); err != nil {
t.Fatalf("loadConfig returned error for empty file: %v", err)
}
if !client.configNeedsSave {
t.Fatal("expected empty config file to mark configNeedsSave")
}
}