From 5b637bb4caac627959868705b80907521295f24b Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 12 Jan 2026 12:20:59 -0800 Subject: [PATCH] Add expo backoff Former-commit-id: faae551aca7447b717e50137a237c78542cb14cf --- main.go | 1 + olm/olm.go | 12 +++++++ olm/types.go | 3 ++ peers/monitor/monitor.go | 61 +++++++++++++++++++++++++++------ peers/monitor/wgtester.go | 71 +++++++++++++++++++++++++++++++-------- 5 files changed, 123 insertions(+), 25 deletions(-) diff --git a/main.go b/main.go index f6c6973..5b6c15e 100644 --- a/main.go +++ b/main.go @@ -219,6 +219,7 @@ func runOlmMainWithArgs(ctx context.Context, cancel context.CancelFunc, signalCt Agent: "Olm CLI", OnExit: cancel, // Pass cancel function directly to trigger shutdown OnTerminated: cancel, + PprofAddr: ":4444", // TODO: REMOVE OR MAKE CONFIGURABLE } olm.Init(ctx, olmConfig) diff --git a/olm/olm.go b/olm/olm.go index 4d12952..03cf02b 100644 --- a/olm/olm.go +++ b/olm/olm.go @@ -5,6 +5,8 @@ import ( "encoding/json" "fmt" "net" + "net/http" + _ "net/http/pprof" "os" "runtime" "strconv" @@ -101,6 +103,16 @@ func Init(ctx context.Context, config GlobalConfig) { logger.GetLogger().SetLevel(util.ParseLogLevel(config.LogLevel)) + // Start pprof server if enabled + if config.PprofAddr != "" { + go func() { + logger.Info("Starting pprof server on %s", config.PprofAddr) + if err := http.ListenAndServe(config.PprofAddr, nil); err != nil { + logger.Error("Failed to start pprof server: %v", err) + } + }() + } + logger.Debug("Checking permissions for native interface") err := permissions.CheckNativeInterfacePermissions() if err != nil { diff --git a/olm/types.go b/olm/types.go index b7153af..a43121f 100644 --- a/olm/types.go +++ b/olm/types.go @@ -23,6 +23,9 @@ type GlobalConfig struct { Version string Agent string + // Debugging + PprofAddr string // Address to serve pprof on (e.g., "localhost:6060") + // Callbacks OnRegistered func() OnConnected func() diff --git a/peers/monitor/monitor.go b/peers/monitor/monitor.go index 27bc408..1ec267e 100644 --- a/peers/monitor/monitor.go +++ b/peers/monitor/monitor.go @@ -61,6 +61,13 @@ type PeerMonitor struct { holepunchMaxAttempts int // max consecutive failures before triggering relay holepunchFailures map[int]int // siteID -> consecutive failure count + // Exponential backoff fields for holepunch monitor + holepunchMinInterval time.Duration // Minimum interval (initial) + holepunchMaxInterval time.Duration // Maximum interval (cap for backoff) + holepunchBackoffMultiplier float64 // Multiplier for each stable check + holepunchStableCount map[int]int // siteID -> consecutive stable status count + holepunchCurrentInterval time.Duration // Current interval with backoff applied + // Rapid initial test fields rapidTestInterval time.Duration // interval between rapid test attempts rapidTestTimeout time.Duration // timeout for each rapid test attempt @@ -101,6 +108,12 @@ func NewPeerMonitor(wsClient *websocket.Client, middleDev *middleDevice.MiddleDe rapidTestMaxAttempts: 5, // 5 attempts = ~1-1.5 seconds total apiServer: apiServer, wgConnectionStatus: make(map[int]bool), + // Exponential backoff settings for holepunch monitor + holepunchMinInterval: 2 * time.Second, + holepunchMaxInterval: 30 * time.Second, + holepunchBackoffMultiplier: 1.5, + holepunchStableCount: make(map[int]int), + holepunchCurrentInterval: 2 * time.Second, } if err := pm.initNetstack(); err != nil { @@ -172,6 +185,7 @@ func (pm *PeerMonitor) AddPeer(siteID int, endpoint string, holepunchEndpoint st client.SetPacketInterval(pm.interval) client.SetTimeout(pm.timeout) client.SetMaxAttempts(pm.maxAttempts) + client.SetMaxInterval(30 * time.Second) // Allow backoff up to 30 seconds when stable pm.monitors[siteID] = client @@ -470,31 +484,50 @@ func (pm *PeerMonitor) stopHolepunchMonitor() { logger.Info("Stopped holepunch connection monitor") } -// runHolepunchMonitor runs the holepunch monitoring loop +// runHolepunchMonitor runs the holepunch monitoring loop with exponential backoff func (pm *PeerMonitor) runHolepunchMonitor() { - ticker := time.NewTicker(pm.holepunchInterval) - defer ticker.Stop() + pm.mutex.Lock() + pm.holepunchCurrentInterval = pm.holepunchMinInterval + pm.mutex.Unlock() - // Do initial check immediately - pm.checkHolepunchEndpoints() + timer := time.NewTimer(0) // Fire immediately for initial check + defer timer.Stop() for { select { case <-pm.holepunchStopChan: return - case <-ticker.C: - pm.checkHolepunchEndpoints() + case <-timer.C: + anyStatusChanged := pm.checkHolepunchEndpoints() + + pm.mutex.Lock() + if anyStatusChanged { + // Reset to minimum interval on any status change + pm.holepunchCurrentInterval = pm.holepunchMinInterval + } else { + // Apply exponential backoff when stable + newInterval := time.Duration(float64(pm.holepunchCurrentInterval) * pm.holepunchBackoffMultiplier) + if newInterval > pm.holepunchMaxInterval { + newInterval = pm.holepunchMaxInterval + } + pm.holepunchCurrentInterval = newInterval + } + currentInterval := pm.holepunchCurrentInterval + pm.mutex.Unlock() + + timer.Reset(currentInterval) } } } // checkHolepunchEndpoints tests all holepunch endpoints -func (pm *PeerMonitor) checkHolepunchEndpoints() { +// Returns true if any endpoint's status changed +func (pm *PeerMonitor) checkHolepunchEndpoints() bool { pm.mutex.Lock() // Check if we're still running before doing any work if !pm.running { pm.mutex.Unlock() - return + return false } endpoints := make(map[int]string, len(pm.holepunchEndpoints)) for siteID, endpoint := range pm.holepunchEndpoints { @@ -504,6 +537,8 @@ func (pm *PeerMonitor) checkHolepunchEndpoints() { maxAttempts := pm.holepunchMaxAttempts pm.mutex.Unlock() + anyStatusChanged := false + for siteID, endpoint := range endpoints { // logger.Debug("Testing holepunch endpoint for site %d: %s", siteID, endpoint) result := pm.holepunchTester.TestEndpoint(endpoint, timeout) @@ -529,7 +564,9 @@ func (pm *PeerMonitor) checkHolepunchEndpoints() { pm.mutex.Unlock() // Log status changes - if !exists || previousStatus != result.Success { + statusChanged := !exists || previousStatus != result.Success + if statusChanged { + anyStatusChanged = true if result.Success { logger.Info("Holepunch to site %d (%s) is CONNECTED (RTT: %v)", siteID, endpoint, result.RTT) } else { @@ -562,7 +599,7 @@ func (pm *PeerMonitor) checkHolepunchEndpoints() { pm.mutex.Unlock() if !stillRunning { - return // Stop processing if shutdown is in progress + return anyStatusChanged // Stop processing if shutdown is in progress } if !result.Success && !isRelayed && failureCount >= maxAttempts { @@ -579,6 +616,8 @@ func (pm *PeerMonitor) checkHolepunchEndpoints() { } } } + + return anyStatusChanged } // GetHolepunchStatus returns the current holepunch status for all endpoints diff --git a/peers/monitor/wgtester.go b/peers/monitor/wgtester.go index dac2008..21f788a 100644 --- a/peers/monitor/wgtester.go +++ b/peers/monitor/wgtester.go @@ -36,6 +36,12 @@ type Client struct { timeout time.Duration maxAttempts int dialer Dialer + + // Exponential backoff fields + minInterval time.Duration // Minimum interval (initial) + maxInterval time.Duration // Maximum interval (cap for backoff) + backoffMultiplier float64 // Multiplier for each stable check + stableCountToBackoff int // Number of stable checks before backing off } // Dialer is a function that creates a connection @@ -50,18 +56,23 @@ type ConnectionStatus struct { // NewClient creates a new connection test client func NewClient(serverAddr string, dialer Dialer) (*Client, error) { return &Client{ - serverAddr: serverAddr, - shutdownCh: make(chan struct{}), - packetInterval: 2 * time.Second, - timeout: 500 * time.Millisecond, // Timeout for individual packets - maxAttempts: 3, // Default max attempts - dialer: dialer, + serverAddr: serverAddr, + shutdownCh: make(chan struct{}), + packetInterval: 2 * time.Second, + minInterval: 2 * time.Second, + maxInterval: 30 * time.Second, + backoffMultiplier: 1.5, + stableCountToBackoff: 3, // After 3 consecutive same-state results, start backing off + timeout: 500 * time.Millisecond, // Timeout for individual packets + maxAttempts: 3, // Default max attempts + dialer: dialer, }, nil } // SetPacketInterval changes how frequently packets are sent in monitor mode func (c *Client) SetPacketInterval(interval time.Duration) { c.packetInterval = interval + c.minInterval = interval } // SetTimeout changes the timeout for waiting for responses @@ -74,6 +85,16 @@ func (c *Client) SetMaxAttempts(attempts int) { c.maxAttempts = attempts } +// SetMaxInterval sets the maximum backoff interval +func (c *Client) SetMaxInterval(interval time.Duration) { + c.maxInterval = interval +} + +// SetBackoffMultiplier sets the multiplier for exponential backoff +func (c *Client) SetBackoffMultiplier(multiplier float64) { + c.backoffMultiplier = multiplier +} + // UpdateServerAddr updates the server address and resets the connection func (c *Client) UpdateServerAddr(serverAddr string) { c.connLock.Lock() @@ -138,6 +159,9 @@ func (c *Client) TestConnection(ctx context.Context) (bool, time.Duration) { binary.BigEndian.PutUint32(packet[0:4], magicHeader) packet[4] = packetTypeRequest + // Reusable response buffer + responseBuffer := make([]byte, packetSize) + // Send multiple attempts as specified for attempt := 0; attempt < c.maxAttempts; attempt++ { select { @@ -157,20 +181,17 @@ func (c *Client) TestConnection(ctx context.Context) (bool, time.Duration) { return false, 0 } - // logger.Debug("Attempting to send monitor packet to %s", c.serverAddr) _, err := c.conn.Write(packet) if err != nil { c.connLock.Unlock() logger.Info("Error sending packet: %v", err) continue } - // logger.Debug("Successfully sent monitor packet") // Set read deadline c.conn.SetReadDeadline(time.Now().Add(c.timeout)) // Wait for response - responseBuffer := make([]byte, packetSize) n, err := c.conn.Read(responseBuffer) c.connLock.Unlock() @@ -238,28 +259,50 @@ func (c *Client) StartMonitor(callback MonitorCallback) error { go func() { var lastConnected bool firstRun := true + stableCount := 0 + currentInterval := c.minInterval - ticker := time.NewTicker(c.packetInterval) - defer ticker.Stop() + timer := time.NewTimer(currentInterval) + defer timer.Stop() for { select { case <-c.shutdownCh: return - case <-ticker.C: + case <-timer.C: ctx, cancel := context.WithTimeout(context.Background(), c.timeout) connected, rtt := c.TestConnection(ctx) cancel() + statusChanged := connected != lastConnected + // Callback if status changed or it's the first check - if connected != lastConnected || firstRun { + if statusChanged || firstRun { callback(ConnectionStatus{ Connected: connected, RTT: rtt, }) lastConnected = connected firstRun = false + // Reset backoff on status change + stableCount = 0 + currentInterval = c.minInterval + } else { + // Status is stable, increment counter + stableCount++ + + // Apply exponential backoff after stable threshold + if stableCount >= c.stableCountToBackoff { + newInterval := time.Duration(float64(currentInterval) * c.backoffMultiplier) + if newInterval > c.maxInterval { + newInterval = c.maxInterval + } + currentInterval = newInterval + } } + + // Reset timer with current interval + timer.Reset(currentInterval) } } }() @@ -278,4 +321,4 @@ func (c *Client) StopMonitor() { close(c.shutdownCh) c.monitorRunning = false -} +} \ No newline at end of file