Power state getting set correctly

This commit is contained in:
Owen
2026-01-14 16:38:40 -08:00
parent 303c2dc0b7
commit 0895156efd
8 changed files with 239 additions and 193 deletions

View File

@@ -28,14 +28,12 @@ import (
// PeerMonitor handles monitoring the connection status to multiple WireGuard peers
type PeerMonitor struct {
monitors map[int]*Client
mutex sync.Mutex
running bool
defaultInterval time.Duration
interval time.Duration
monitors map[int]*Client
mutex sync.Mutex
running bool
timeout time.Duration
maxAttempts int
wsClient *websocket.Client
maxAttempts int
wsClient *websocket.Client
// Netstack fields
middleDev *middleDevice.MiddleDevice
@@ -54,7 +52,8 @@ type PeerMonitor struct {
holepunchTimeout time.Duration
holepunchEndpoints map[int]string // siteID -> endpoint for holepunch testing
holepunchStatus map[int]bool // siteID -> connected status
holepunchStopChan chan struct{}
holepunchStopChan chan struct{}
holepunchUpdateChan chan struct{}
// Relay tracking fields
relayedPeers map[int]bool // siteID -> whether the peer is currently relayed
@@ -87,8 +86,6 @@ func NewPeerMonitor(wsClient *websocket.Client, middleDev *middleDevice.MiddleDe
ctx, cancel := context.WithCancel(context.Background())
pm := &PeerMonitor{
monitors: make(map[int]*Client),
defaultInterval: 2 * time.Second,
interval: 2 * time.Second, // Default check interval (faster)
timeout: 3 * time.Second,
maxAttempts: 3,
wsClient: wsClient,
@@ -118,6 +115,7 @@ func NewPeerMonitor(wsClient *websocket.Client, middleDev *middleDevice.MiddleDe
holepunchBackoffMultiplier: 1.5,
holepunchStableCount: make(map[int]int),
holepunchCurrentInterval: 2 * time.Second,
holepunchUpdateChan: make(chan struct{}, 1),
}
if err := pm.initNetstack(); err != nil {
@@ -133,82 +131,76 @@ func NewPeerMonitor(wsClient *websocket.Client, middleDev *middleDevice.MiddleDe
}
// SetInterval changes how frequently peers are checked
func (pm *PeerMonitor) SetInterval(interval time.Duration) {
func (pm *PeerMonitor) SetPeerInterval(minInterval, maxInterval time.Duration) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.interval = interval
// Update interval for all existing monitors
for _, client := range pm.monitors {
client.SetPacketInterval(interval)
client.SetPacketInterval(minInterval, maxInterval)
}
logger.Info("Set peer monitor interval to min: %s, max: %s", minInterval, maxInterval)
}
func (pm *PeerMonitor) ResetInterval() {
func (pm *PeerMonitor) ResetPeerInterval() {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.interval = pm.defaultInterval
// Update interval for all existing monitors
for _, client := range pm.monitors {
client.SetPacketInterval(pm.defaultInterval)
client.ResetPacketInterval()
}
}
// SetTimeout changes the timeout for waiting for responses
func (pm *PeerMonitor) SetTimeout(timeout time.Duration) {
// SetPeerHolepunchInterval sets both the minimum and maximum intervals for holepunch monitoring
func (pm *PeerMonitor) SetPeerHolepunchInterval(minInterval, maxInterval time.Duration) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.timeout = timeout
// Update timeout for all existing monitors
for _, client := range pm.monitors {
client.SetTimeout(timeout)
}
}
// SetMaxAttempts changes the maximum number of attempts for TestConnection
func (pm *PeerMonitor) SetMaxAttempts(attempts int) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.maxAttempts = attempts
// Update max attempts for all existing monitors
for _, client := range pm.monitors {
client.SetMaxAttempts(attempts)
}
}
// SetHolepunchInterval sets both the minimum and maximum intervals for holepunch monitoring
func (pm *PeerMonitor) SetHolepunchInterval(minInterval, maxInterval time.Duration) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.holepunchMinInterval = minInterval
pm.holepunchMaxInterval = maxInterval
// Reset current interval to the new minimum
pm.holepunchCurrentInterval = minInterval
updateChan := pm.holepunchUpdateChan
pm.mutex.Unlock()
logger.Info("Set holepunch interval to min: %s, max: %s", minInterval, maxInterval)
// Signal the goroutine to apply the new interval if running
if updateChan != nil {
select {
case updateChan <- struct{}{}:
default:
// Channel full or closed, skip
}
}
}
// GetHolepunchIntervals returns the current minimum and maximum intervals for holepunch monitoring
func (pm *PeerMonitor) GetHolepunchIntervals() (minInterval, maxInterval time.Duration) {
// GetPeerHolepunchIntervals returns the current minimum and maximum intervals for holepunch monitoring
func (pm *PeerMonitor) GetPeerHolepunchIntervals() (minInterval, maxInterval time.Duration) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
return pm.holepunchMinInterval, pm.holepunchMaxInterval
}
func (pm *PeerMonitor) ResetHolepunchInterval() {
func (pm *PeerMonitor) ResetPeerHolepunchInterval() {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.holepunchMinInterval = pm.defaultHolepunchMinInterval
pm.holepunchMaxInterval = pm.defaultHolepunchMaxInterval
pm.holepunchCurrentInterval = pm.defaultHolepunchMinInterval
updateChan := pm.holepunchUpdateChan
pm.mutex.Unlock()
logger.Info("Reset holepunch interval to defaults: min=%v, max=%v", pm.defaultHolepunchMinInterval, pm.defaultHolepunchMaxInterval)
// Signal the goroutine to apply the new interval if running
if updateChan != nil {
select {
case updateChan <- struct{}{}:
default:
// Channel full or closed, skip
}
}
}
// AddPeer adds a new peer to monitor
@@ -226,11 +218,6 @@ func (pm *PeerMonitor) AddPeer(siteID int, endpoint string, holepunchEndpoint st
return err
}
client.SetPacketInterval(pm.interval)
client.SetTimeout(pm.timeout)
client.SetMaxAttempts(pm.maxAttempts)
client.SetMaxInterval(30 * time.Second) // Allow backoff up to 30 seconds when stable
pm.monitors[siteID] = client
pm.holepunchEndpoints[siteID] = holepunchEndpoint
@@ -541,6 +528,15 @@ func (pm *PeerMonitor) runHolepunchMonitor() {
select {
case <-pm.holepunchStopChan:
return
case <-pm.holepunchUpdateChan:
// Interval settings changed, reset to minimum
pm.mutex.Lock()
pm.holepunchCurrentInterval = pm.holepunchMinInterval
currentInterval := pm.holepunchCurrentInterval
pm.mutex.Unlock()
timer.Reset(currentInterval)
logger.Debug("Holepunch monitor interval updated, reset to %v", currentInterval)
case <-timer.C:
anyStatusChanged := pm.checkHolepunchEndpoints()
@@ -584,7 +580,7 @@ func (pm *PeerMonitor) checkHolepunchEndpoints() bool {
anyStatusChanged := false
for siteID, endpoint := range endpoints {
// logger.Debug("Testing holepunch endpoint for site %d: %s", siteID, endpoint)
logger.Debug("holepunchTester: testing endpoint for site %d: %s", siteID, endpoint)
result := pm.holepunchTester.TestEndpoint(endpoint, timeout)
pm.mutex.Lock()
@@ -733,55 +729,55 @@ func (pm *PeerMonitor) Close() {
logger.Debug("PeerMonitor: Cleanup complete")
}
// TestPeer tests connectivity to a specific peer
func (pm *PeerMonitor) TestPeer(siteID int) (bool, time.Duration, error) {
pm.mutex.Lock()
client, exists := pm.monitors[siteID]
pm.mutex.Unlock()
// // TestPeer tests connectivity to a specific peer
// func (pm *PeerMonitor) TestPeer(siteID int) (bool, time.Duration, error) {
// pm.mutex.Lock()
// client, exists := pm.monitors[siteID]
// pm.mutex.Unlock()
if !exists {
return false, 0, fmt.Errorf("peer with siteID %d not found", siteID)
}
// if !exists {
// return false, 0, fmt.Errorf("peer with siteID %d not found", siteID)
// }
ctx, cancel := context.WithTimeout(context.Background(), pm.timeout*time.Duration(pm.maxAttempts))
defer cancel()
// ctx, cancel := context.WithTimeout(context.Background(), pm.timeout*time.Duration(pm.maxAttempts))
// defer cancel()
connected, rtt := client.TestConnection(ctx)
return connected, rtt, nil
}
// connected, rtt := client.TestPeerConnection(ctx)
// return connected, rtt, nil
// }
// TestAllPeers tests connectivity to all peers
func (pm *PeerMonitor) TestAllPeers() map[int]struct {
Connected bool
RTT time.Duration
} {
pm.mutex.Lock()
peers := make(map[int]*Client, len(pm.monitors))
for siteID, client := range pm.monitors {
peers[siteID] = client
}
pm.mutex.Unlock()
// // TestAllPeers tests connectivity to all peers
// func (pm *PeerMonitor) TestAllPeers() map[int]struct {
// Connected bool
// RTT time.Duration
// } {
// pm.mutex.Lock()
// peers := make(map[int]*Client, len(pm.monitors))
// for siteID, client := range pm.monitors {
// peers[siteID] = client
// }
// pm.mutex.Unlock()
results := make(map[int]struct {
Connected bool
RTT time.Duration
})
for siteID, client := range peers {
ctx, cancel := context.WithTimeout(context.Background(), pm.timeout*time.Duration(pm.maxAttempts))
connected, rtt := client.TestConnection(ctx)
cancel()
// results := make(map[int]struct {
// Connected bool
// RTT time.Duration
// })
// for siteID, client := range peers {
// ctx, cancel := context.WithTimeout(context.Background(), pm.timeout*time.Duration(pm.maxAttempts))
// connected, rtt := client.TestPeerConnection(ctx)
// cancel()
results[siteID] = struct {
Connected bool
RTT time.Duration
}{
Connected: connected,
RTT: rtt,
}
}
// results[siteID] = struct {
// Connected bool
// RTT time.Duration
// }{
// Connected: connected,
// RTT: rtt,
// }
// }
return results
}
// return results
// }
// initNetstack initializes the gvisor netstack
func (pm *PeerMonitor) initNetstack() error {

View File

@@ -32,16 +32,19 @@ type Client struct {
monitorLock sync.Mutex
connLock sync.Mutex // Protects connection operations
shutdownCh chan struct{}
updateCh chan struct{}
packetInterval time.Duration
timeout time.Duration
maxAttempts int
dialer Dialer
// Exponential backoff fields
minInterval time.Duration // Minimum interval (initial)
maxInterval time.Duration // Maximum interval (cap for backoff)
backoffMultiplier float64 // Multiplier for each stable check
stableCountToBackoff int // Number of stable checks before backing off
defaultMinInterval time.Duration // Default minimum interval (initial)
defaultMaxInterval time.Duration // Default maximum interval (cap for backoff)
minInterval time.Duration // Minimum interval (initial)
maxInterval time.Duration // Maximum interval (cap for backoff)
backoffMultiplier float64 // Multiplier for each stable check
stableCountToBackoff int // Number of stable checks before backing off
}
// Dialer is a function that creates a connection
@@ -56,43 +59,59 @@ type ConnectionStatus struct {
// NewClient creates a new connection test client
func NewClient(serverAddr string, dialer Dialer) (*Client, error) {
return &Client{
serverAddr: serverAddr,
shutdownCh: make(chan struct{}),
packetInterval: 2 * time.Second,
minInterval: 2 * time.Second,
maxInterval: 30 * time.Second,
backoffMultiplier: 1.5,
stableCountToBackoff: 3, // After 3 consecutive same-state results, start backing off
timeout: 500 * time.Millisecond, // Timeout for individual packets
maxAttempts: 3, // Default max attempts
dialer: dialer,
serverAddr: serverAddr,
shutdownCh: make(chan struct{}),
updateCh: make(chan struct{}, 1),
packetInterval: 2 * time.Second,
defaultMinInterval: 2 * time.Second,
defaultMaxInterval: 30 * time.Second,
minInterval: 2 * time.Second,
maxInterval: 30 * time.Second,
backoffMultiplier: 1.5,
stableCountToBackoff: 3, // After 3 consecutive same-state results, start backing off
timeout: 500 * time.Millisecond, // Timeout for individual packets
maxAttempts: 3, // Default max attempts
dialer: dialer,
}, nil
}
// SetPacketInterval changes how frequently packets are sent in monitor mode
func (c *Client) SetPacketInterval(interval time.Duration) {
c.packetInterval = interval
c.minInterval = interval
func (c *Client) SetPacketInterval(minInterval, maxInterval time.Duration) {
c.monitorLock.Lock()
c.packetInterval = minInterval
c.minInterval = minInterval
c.maxInterval = maxInterval
updateCh := c.updateCh
monitorRunning := c.monitorRunning
c.monitorLock.Unlock()
// Signal the goroutine to apply the new interval if running
if monitorRunning && updateCh != nil {
select {
case updateCh <- struct{}{}:
default:
// Channel full or closed, skip
}
}
}
// SetTimeout changes the timeout for waiting for responses
func (c *Client) SetTimeout(timeout time.Duration) {
c.timeout = timeout
}
func (c *Client) ResetPacketInterval() {
c.monitorLock.Lock()
c.packetInterval = c.defaultMinInterval
c.minInterval = c.defaultMinInterval
c.maxInterval = c.defaultMaxInterval
updateCh := c.updateCh
monitorRunning := c.monitorRunning
c.monitorLock.Unlock()
// SetMaxAttempts changes the maximum number of attempts for TestConnection
func (c *Client) SetMaxAttempts(attempts int) {
c.maxAttempts = attempts
}
// SetMaxInterval sets the maximum backoff interval
func (c *Client) SetMaxInterval(interval time.Duration) {
c.maxInterval = interval
}
// SetBackoffMultiplier sets the multiplier for exponential backoff
func (c *Client) SetBackoffMultiplier(multiplier float64) {
c.backoffMultiplier = multiplier
// Signal the goroutine to apply the new interval if running
if monitorRunning && updateCh != nil {
select {
case updateCh <- struct{}{}:
default:
// Channel full or closed, skip
}
}
}
// UpdateServerAddr updates the server address and resets the connection
@@ -146,9 +165,10 @@ func (c *Client) ensureConnection() error {
return nil
}
// TestConnection checks if the connection to the server is working
// TestPeerConnection checks if the connection to the server is working
// Returns true if connected, false otherwise
func (c *Client) TestConnection(ctx context.Context) (bool, time.Duration) {
func (c *Client) TestPeerConnection(ctx context.Context) (bool, time.Duration) {
logger.Debug("wgtester: testing connection to peer %s", c.serverAddr)
if err := c.ensureConnection(); err != nil {
logger.Warn("Failed to ensure connection: %v", err)
return false, 0
@@ -232,7 +252,7 @@ func (c *Client) TestConnection(ctx context.Context) (bool, time.Duration) {
func (c *Client) TestConnectionWithTimeout(timeout time.Duration) (bool, time.Duration) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return c.TestConnection(ctx)
return c.TestPeerConnection(ctx)
}
// MonitorCallback is the function type for connection status change callbacks
@@ -269,9 +289,20 @@ func (c *Client) StartMonitor(callback MonitorCallback) error {
select {
case <-c.shutdownCh:
return
case <-c.updateCh:
// Interval settings changed, reset to minimum
c.monitorLock.Lock()
currentInterval = c.minInterval
c.monitorLock.Unlock()
// Reset backoff state
stableCount = 0
timer.Reset(currentInterval)
logger.Debug("Packet interval updated, reset to %v", currentInterval)
case <-timer.C:
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
connected, rtt := c.TestConnection(ctx)
connected, rtt := c.TestPeerConnection(ctx)
cancel()
statusChanged := connected != lastConnected
@@ -321,4 +352,4 @@ func (c *Client) StopMonitor() {
close(c.shutdownCh)
c.monitorRunning = false
}
}