Reduce latency calling http endpoint in hot path

This commit is contained in:
Owen
2026-05-14 15:39:29 -07:00
parent e5241751fc
commit 180b5fe768

View File

@@ -137,6 +137,17 @@ const (
WireGuardMessageTypeTransportData = 4
)
// cachedEndpointState holds the last-known endpoint fields used for change detection.
// Timestamp is intentionally excluded since it always changes.
type cachedEndpointState struct {
OlmID string
NewtID string
Token string
IP string
Port int
PublicKey string
}
// --- End Types ---
// bufferPool allows reusing buffers to reduce allocations.
@@ -172,6 +183,12 @@ type UDPProxyServer struct {
// Cache for resolved UDP addresses to avoid per-packet DNS lookups
// Key: "ip:port" string, Value: *net.UDPAddr
addrCache sync.Map
// lastEndpointCache stores the last-known endpoint state per client (key: olmId:newtId)
// used to skip redundant HTTP notifications when nothing has changed.
lastEndpointCache sync.Map
// notifyChan is the async queue for hole-punch endpoint notifications.
// Dedicated notifier workers drain this channel and perform the HTTP call.
notifyChan chan ClientEndpoint
// ReachableAt is the URL where this server can be reached
ReachableAt string
}
@@ -184,6 +201,7 @@ func NewUDPProxyServer(parentCtx context.Context, addr, serverURL string, privat
serverURL: serverURL,
privateKey: privateKey,
packetChan: make(chan Packet, 50000), // Increased from 1000 to handle high throughput
notifyChan: make(chan ClientEndpoint, 1000),
ReachableAt: reachableAt,
ctx: ctx,
cancel: cancel,
@@ -237,6 +255,11 @@ func (s *UDPProxyServer) Start() error {
// Start the hole punch rate limiter cleanup routine
go s.cleanupHolePunchRateLimiter()
// Start async endpoint notifier workers (HTTP calls off the hot path)
for i := 0; i < 5; i++ {
go s.endpointNotifierWorker()
}
return nil
}
@@ -375,7 +398,37 @@ func (s *UDPProxyServer) packetWorker() {
ClientPublicKey: msg.PublicKey,
}
logger.Debug("Created endpoint from packet remoteAddr %s: IP=%s, Port=%d", packet.remoteAddr.String(), endpoint.IP, endpoint.Port)
s.notifyServer(endpoint)
// Check if anything meaningful changed before queuing an HTTP notification.
cacheKey := endpoint.OlmID + ":" + endpoint.NewtID
newState := cachedEndpointState{
OlmID: endpoint.OlmID,
NewtID: endpoint.NewtID,
Token: endpoint.Token,
IP: endpoint.IP,
Port: endpoint.Port,
PublicKey: endpoint.ClientPublicKey,
}
if cached, ok := s.lastEndpointCache.Load(cacheKey); ok && cached.(cachedEndpointState) == newState {
// Endpoint unchanged - skip the HTTP call but still clear stale sessions.
logger.Debug("Endpoint unchanged for %s, skipping notification", cacheKey)
metrics.RecordHolePunchEvent(relayIfname, "deduplicated")
s.clearSessionsForIP(endpoint.IP)
metrics.RecordHolePunchEvent(relayIfname, "success")
bufferPool.Put(packet.data[:1500])
continue
}
s.lastEndpointCache.Store(cacheKey, newState)
// Queue the notification asynchronously so the hot path is not blocked by HTTP.
select {
case s.notifyChan <- endpoint:
case <-s.ctx.Done():
// shutting down
default:
logger.Debug("Notification queue full, dropping hole punch notification for %s:%d", endpoint.IP, endpoint.Port)
metrics.RecordHolePunchEvent(relayIfname, "queue_full")
}
s.clearSessionsForIP(endpoint.IP) // Clear sessions for this IP to allow re-establishment
metrics.RecordHolePunchEvent(relayIfname, "success")
}
@@ -384,6 +437,22 @@ func (s *UDPProxyServer) packetWorker() {
}
}
// endpointNotifierWorker drains the notifyChan and performs the HTTP notification for each
// hole-punch endpoint. Running several of these keeps latency low even when the server is slow.
func (s *UDPProxyServer) endpointNotifierWorker() {
for {
select {
case endpoint, ok := <-s.notifyChan:
if !ok {
return
}
s.notifyServer(endpoint)
case <-s.ctx.Done():
return
}
}
}
// decryptMessage decrypts the message using the server's private key
func (s *UDPProxyServer) decryptMessage(encMsg EncryptedHolePunchMessage) ([]byte, error) {
// Parse the ephemeral public key