diff --git a/relay/relay.go b/relay/relay.go index dad47c1..0cad9bb 100644 --- a/relay/relay.go +++ b/relay/relay.go @@ -137,6 +137,17 @@ const ( WireGuardMessageTypeTransportData = 4 ) +// cachedEndpointState holds the last-known endpoint fields used for change detection. +// Timestamp is intentionally excluded since it always changes. +type cachedEndpointState struct { + OlmID string + NewtID string + Token string + IP string + Port int + PublicKey string +} + // --- End Types --- // bufferPool allows reusing buffers to reduce allocations. @@ -172,6 +183,12 @@ type UDPProxyServer struct { // Cache for resolved UDP addresses to avoid per-packet DNS lookups // Key: "ip:port" string, Value: *net.UDPAddr addrCache sync.Map + // lastEndpointCache stores the last-known endpoint state per client (key: olmId:newtId) + // used to skip redundant HTTP notifications when nothing has changed. + lastEndpointCache sync.Map + // notifyChan is the async queue for hole-punch endpoint notifications. + // Dedicated notifier workers drain this channel and perform the HTTP call. + notifyChan chan ClientEndpoint // ReachableAt is the URL where this server can be reached ReachableAt string } @@ -184,6 +201,7 @@ func NewUDPProxyServer(parentCtx context.Context, addr, serverURL string, privat serverURL: serverURL, privateKey: privateKey, packetChan: make(chan Packet, 50000), // Increased from 1000 to handle high throughput + notifyChan: make(chan ClientEndpoint, 1000), ReachableAt: reachableAt, ctx: ctx, cancel: cancel, @@ -237,6 +255,11 @@ func (s *UDPProxyServer) Start() error { // Start the hole punch rate limiter cleanup routine go s.cleanupHolePunchRateLimiter() + // Start async endpoint notifier workers (HTTP calls off the hot path) + for i := 0; i < 5; i++ { + go s.endpointNotifierWorker() + } + return nil } @@ -375,7 +398,37 @@ func (s *UDPProxyServer) packetWorker() { ClientPublicKey: msg.PublicKey, } logger.Debug("Created endpoint from packet remoteAddr %s: IP=%s, Port=%d", packet.remoteAddr.String(), endpoint.IP, endpoint.Port) - s.notifyServer(endpoint) + + // Check if anything meaningful changed before queuing an HTTP notification. + cacheKey := endpoint.OlmID + ":" + endpoint.NewtID + newState := cachedEndpointState{ + OlmID: endpoint.OlmID, + NewtID: endpoint.NewtID, + Token: endpoint.Token, + IP: endpoint.IP, + Port: endpoint.Port, + PublicKey: endpoint.ClientPublicKey, + } + if cached, ok := s.lastEndpointCache.Load(cacheKey); ok && cached.(cachedEndpointState) == newState { + // Endpoint unchanged - skip the HTTP call but still clear stale sessions. + logger.Debug("Endpoint unchanged for %s, skipping notification", cacheKey) + metrics.RecordHolePunchEvent(relayIfname, "deduplicated") + s.clearSessionsForIP(endpoint.IP) + metrics.RecordHolePunchEvent(relayIfname, "success") + bufferPool.Put(packet.data[:1500]) + continue + } + s.lastEndpointCache.Store(cacheKey, newState) + + // Queue the notification asynchronously so the hot path is not blocked by HTTP. + select { + case s.notifyChan <- endpoint: + case <-s.ctx.Done(): + // shutting down + default: + logger.Debug("Notification queue full, dropping hole punch notification for %s:%d", endpoint.IP, endpoint.Port) + metrics.RecordHolePunchEvent(relayIfname, "queue_full") + } s.clearSessionsForIP(endpoint.IP) // Clear sessions for this IP to allow re-establishment metrics.RecordHolePunchEvent(relayIfname, "success") } @@ -384,6 +437,22 @@ func (s *UDPProxyServer) packetWorker() { } } +// endpointNotifierWorker drains the notifyChan and performs the HTTP notification for each +// hole-punch endpoint. Running several of these keeps latency low even when the server is slow. +func (s *UDPProxyServer) endpointNotifierWorker() { + for { + select { + case endpoint, ok := <-s.notifyChan: + if !ok { + return + } + s.notifyServer(endpoint) + case <-s.ctx.Done(): + return + } + } +} + // decryptMessage decrypts the message using the server's private key func (s *UDPProxyServer) decryptMessage(encMsg EncryptedHolePunchMessage) ([]byte, error) { // Parse the ephemeral public key