mirror of
https://github.com/fosrl/olm.git
synced 2026-03-11 05:06:43 +00:00
Add expo backoff
This commit is contained in:
@@ -36,6 +36,12 @@ type Client struct {
|
||||
timeout time.Duration
|
||||
maxAttempts int
|
||||
dialer Dialer
|
||||
|
||||
// Exponential backoff fields
|
||||
minInterval time.Duration // Minimum interval (initial)
|
||||
maxInterval time.Duration // Maximum interval (cap for backoff)
|
||||
backoffMultiplier float64 // Multiplier for each stable check
|
||||
stableCountToBackoff int // Number of stable checks before backing off
|
||||
}
|
||||
|
||||
// Dialer is a function that creates a connection
|
||||
@@ -50,18 +56,23 @@ type ConnectionStatus struct {
|
||||
// NewClient creates a new connection test client
|
||||
func NewClient(serverAddr string, dialer Dialer) (*Client, error) {
|
||||
return &Client{
|
||||
serverAddr: serverAddr,
|
||||
shutdownCh: make(chan struct{}),
|
||||
packetInterval: 2 * time.Second,
|
||||
timeout: 500 * time.Millisecond, // Timeout for individual packets
|
||||
maxAttempts: 3, // Default max attempts
|
||||
dialer: dialer,
|
||||
serverAddr: serverAddr,
|
||||
shutdownCh: make(chan struct{}),
|
||||
packetInterval: 2 * time.Second,
|
||||
minInterval: 2 * time.Second,
|
||||
maxInterval: 30 * time.Second,
|
||||
backoffMultiplier: 1.5,
|
||||
stableCountToBackoff: 3, // After 3 consecutive same-state results, start backing off
|
||||
timeout: 500 * time.Millisecond, // Timeout for individual packets
|
||||
maxAttempts: 3, // Default max attempts
|
||||
dialer: dialer,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SetPacketInterval changes how frequently packets are sent in monitor mode
|
||||
func (c *Client) SetPacketInterval(interval time.Duration) {
|
||||
c.packetInterval = interval
|
||||
c.minInterval = interval
|
||||
}
|
||||
|
||||
// SetTimeout changes the timeout for waiting for responses
|
||||
@@ -74,6 +85,16 @@ func (c *Client) SetMaxAttempts(attempts int) {
|
||||
c.maxAttempts = attempts
|
||||
}
|
||||
|
||||
// SetMaxInterval sets the maximum backoff interval
|
||||
func (c *Client) SetMaxInterval(interval time.Duration) {
|
||||
c.maxInterval = interval
|
||||
}
|
||||
|
||||
// SetBackoffMultiplier sets the multiplier for exponential backoff
|
||||
func (c *Client) SetBackoffMultiplier(multiplier float64) {
|
||||
c.backoffMultiplier = multiplier
|
||||
}
|
||||
|
||||
// UpdateServerAddr updates the server address and resets the connection
|
||||
func (c *Client) UpdateServerAddr(serverAddr string) {
|
||||
c.connLock.Lock()
|
||||
@@ -138,6 +159,9 @@ func (c *Client) TestConnection(ctx context.Context) (bool, time.Duration) {
|
||||
binary.BigEndian.PutUint32(packet[0:4], magicHeader)
|
||||
packet[4] = packetTypeRequest
|
||||
|
||||
// Reusable response buffer
|
||||
responseBuffer := make([]byte, packetSize)
|
||||
|
||||
// Send multiple attempts as specified
|
||||
for attempt := 0; attempt < c.maxAttempts; attempt++ {
|
||||
select {
|
||||
@@ -157,20 +181,17 @@ func (c *Client) TestConnection(ctx context.Context) (bool, time.Duration) {
|
||||
return false, 0
|
||||
}
|
||||
|
||||
// logger.Debug("Attempting to send monitor packet to %s", c.serverAddr)
|
||||
_, err := c.conn.Write(packet)
|
||||
if err != nil {
|
||||
c.connLock.Unlock()
|
||||
logger.Info("Error sending packet: %v", err)
|
||||
continue
|
||||
}
|
||||
// logger.Debug("Successfully sent monitor packet")
|
||||
|
||||
// Set read deadline
|
||||
c.conn.SetReadDeadline(time.Now().Add(c.timeout))
|
||||
|
||||
// Wait for response
|
||||
responseBuffer := make([]byte, packetSize)
|
||||
n, err := c.conn.Read(responseBuffer)
|
||||
c.connLock.Unlock()
|
||||
|
||||
@@ -238,28 +259,50 @@ func (c *Client) StartMonitor(callback MonitorCallback) error {
|
||||
go func() {
|
||||
var lastConnected bool
|
||||
firstRun := true
|
||||
stableCount := 0
|
||||
currentInterval := c.minInterval
|
||||
|
||||
ticker := time.NewTicker(c.packetInterval)
|
||||
defer ticker.Stop()
|
||||
timer := time.NewTimer(currentInterval)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.shutdownCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
case <-timer.C:
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
|
||||
connected, rtt := c.TestConnection(ctx)
|
||||
cancel()
|
||||
|
||||
statusChanged := connected != lastConnected
|
||||
|
||||
// Callback if status changed or it's the first check
|
||||
if connected != lastConnected || firstRun {
|
||||
if statusChanged || firstRun {
|
||||
callback(ConnectionStatus{
|
||||
Connected: connected,
|
||||
RTT: rtt,
|
||||
})
|
||||
lastConnected = connected
|
||||
firstRun = false
|
||||
// Reset backoff on status change
|
||||
stableCount = 0
|
||||
currentInterval = c.minInterval
|
||||
} else {
|
||||
// Status is stable, increment counter
|
||||
stableCount++
|
||||
|
||||
// Apply exponential backoff after stable threshold
|
||||
if stableCount >= c.stableCountToBackoff {
|
||||
newInterval := time.Duration(float64(currentInterval) * c.backoffMultiplier)
|
||||
if newInterval > c.maxInterval {
|
||||
newInterval = c.maxInterval
|
||||
}
|
||||
currentInterval = newInterval
|
||||
}
|
||||
}
|
||||
|
||||
// Reset timer with current interval
|
||||
timer.Reset(currentInterval)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -278,4 +321,4 @@ func (c *Client) StopMonitor() {
|
||||
|
||||
close(c.shutdownCh)
|
||||
c.monitorRunning = false
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user