From 23e7b173c94bbe758eedcd059deac382c596b676 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 1 Dec 2025 20:21:05 -0500 Subject: [PATCH] Move failover command to monitor --- olm/olm.go | 13 +++++--- olm/types.go | 3 -- peermonitor/peermonitor.go | 67 +++++++------------------------------- peers/manager.go | 32 ++++++++++++++++++ 4 files changed, 51 insertions(+), 64 deletions(-) diff --git a/olm/olm.go b/olm/olm.go index 264e651..da04daf 100644 --- a/olm/olm.go +++ b/olm/olm.go @@ -392,6 +392,12 @@ func StartTunnel(config TunnelConfig) { interfaceIP = strings.Split(interfaceIP, "/")[0] } + // Determine if we should send relay messages (only when holepunching is enabled and relay is not disabled) + var wsClientForMonitor *websocket.Client + if config.Holepunch && !config.DisableRelay { + wsClientForMonitor = olm + } + peerMonitor = peermonitor.NewPeerMonitor( func(siteID int, connected bool, rtt time.Duration) { // Find the site config to get endpoint information @@ -413,10 +419,7 @@ func StartTunnel(config TunnelConfig) { logger.Warn("Peer %d is disconnected", siteID) } }, - util.FixKey(privateKey.String()), - olm, - dev, - config.Holepunch && !config.DisableRelay, // Enable relay only if holepunching is enabled and DisableRelay is false + wsClientForMonitor, middleDev, interfaceIP, sharedBind, // Pass sharedBind for holepunch testing @@ -710,7 +713,7 @@ func StartTunnel(config TunnelConfig) { // Update HTTP server to mark this peer as using relay apiServer.UpdatePeerRelayStatus(relayData.SiteId, relayData.Endpoint, true) - peerMonitor.HandleFailover(relayData.SiteId, primaryRelay) + peerManager.HandleFailover(relayData.SiteId, primaryRelay) }) // Handler for peer handshake - adds exit node to holepunch rotation and notifies server diff --git a/olm/types.go b/olm/types.go index 5f384b7..8504b77 100644 --- a/olm/types.go +++ b/olm/types.go @@ -27,9 +27,6 @@ type GlobalConfig struct { OnConnected func() OnTerminated func() OnAuthError func(statusCode int, message string) // Called when auth fails (401/403) - - // Source tracking (not in JSON) - sources map[string]string } type TunnelConfig struct { diff --git a/peermonitor/peermonitor.go b/peermonitor/peermonitor.go index b83f705..59856a6 100644 --- a/peermonitor/peermonitor.go +++ b/peermonitor/peermonitor.go @@ -5,7 +5,6 @@ import ( "fmt" "net" "net/netip" - "strings" "sync" "time" @@ -15,7 +14,6 @@ import ( "github.com/fosrl/newt/util" middleDevice "github.com/fosrl/olm/device" "github.com/fosrl/olm/websocket" - "golang.zx2c4.com/wireguard/device" "gvisor.dev/gvisor/pkg/buffer" "gvisor.dev/gvisor/pkg/tcpip" "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet" @@ -44,18 +42,15 @@ type WireGuardConfig struct { // PeerMonitor handles monitoring the connection status to multiple WireGuard peers type PeerMonitor struct { - monitors map[int]*Client - configs map[int]*WireGuardConfig - callback PeerMonitorCallback - mutex sync.Mutex - running bool - interval time.Duration - timeout time.Duration - maxAttempts int - privateKey string - wsClient *websocket.Client - device *device.Device - handleRelaySwitch bool // Whether to handle relay switching + monitors map[int]*Client + configs map[int]*WireGuardConfig + callback PeerMonitorCallback + mutex sync.Mutex + running bool + interval time.Duration + timeout time.Duration + maxAttempts int + wsClient *websocket.Client // Netstack fields middleDev *middleDevice.MiddleDevice @@ -80,7 +75,7 @@ type PeerMonitor struct { } // NewPeerMonitor creates a new peer monitor with the given callback -func NewPeerMonitor(callback PeerMonitorCallback, privateKey string, wsClient *websocket.Client, device *device.Device, handleRelaySwitch bool, middleDev *middleDevice.MiddleDevice, localIP string, sharedBind *bind.SharedBind) *PeerMonitor { +func NewPeerMonitor(callback PeerMonitorCallback, wsClient *websocket.Client, middleDev *middleDevice.MiddleDevice, localIP string, sharedBind *bind.SharedBind) *PeerMonitor { ctx, cancel := context.WithCancel(context.Background()) pm := &PeerMonitor{ monitors: make(map[int]*Client), @@ -89,10 +84,7 @@ func NewPeerMonitor(callback PeerMonitorCallback, privateKey string, wsClient *w interval: 1 * time.Second, // Default check interval timeout: 2500 * time.Millisecond, maxAttempts: 15, - privateKey: privateKey, wsClient: wsClient, - device: device, - handleRelaySwitch: handleRelaySwitch, middleDev: middleDev, localIP: localIP, activePorts: make(map[uint16]bool), @@ -245,53 +237,16 @@ func (pm *PeerMonitor) handleConnectionStatusChange(siteID int, status Connectio pm.callback(siteID, status.Connected, status.RTT) } - // If disconnected, handle failover + // If disconnected, send relay message to the server if !status.Connected { - // Send relay message to the server if pm.wsClient != nil { pm.sendRelay(siteID) } } } -// handleFailover handles failover to the relay server when a peer is disconnected -func (pm *PeerMonitor) HandleFailover(siteID int, relayEndpoint string) { - pm.mutex.Lock() - config, exists := pm.configs[siteID] - pm.mutex.Unlock() - - if !exists { - return - } - - // Check for IPv6 and format the endpoint correctly - formattedEndpoint := relayEndpoint - if strings.Contains(relayEndpoint, ":") { - formattedEndpoint = fmt.Sprintf("[%s]", relayEndpoint) - } - - // Configure WireGuard to use the relay - wgConfig := fmt.Sprintf(`private_key=%s -public_key=%s -allowed_ip=%s/32 -endpoint=%s:21820 -persistent_keepalive_interval=1`, pm.privateKey, config.PublicKey, config.ServerIP, formattedEndpoint) - - err := pm.device.IpcSet(wgConfig) - if err != nil { - logger.Error("Failed to configure WireGuard device: %v\n", err) - return - } - - logger.Info("Adjusted peer %d to point to relay!\n", siteID) -} - // sendRelay sends a relay message to the server func (pm *PeerMonitor) sendRelay(siteID int) error { - if !pm.handleRelaySwitch { - return nil - } - if pm.wsClient == nil { return fmt.Errorf("websocket client is nil") } diff --git a/peers/manager.go b/peers/manager.go index c837d22..7b18350 100644 --- a/peers/manager.go +++ b/peers/manager.go @@ -3,6 +3,7 @@ package peers import ( "fmt" "net" + "strings" "sync" "github.com/fosrl/newt/logger" @@ -594,3 +595,34 @@ func (pm *PeerManager) RemoveAlias(siteId int, aliasName string) error { return nil } + +// HandleFailover handles failover to the relay server when a peer is disconnected +func (pm *PeerManager) HandleFailover(siteId int, relayEndpoint string) { + pm.mu.RLock() + peer, exists := pm.peers[siteId] + pm.mu.RUnlock() + + if !exists { + logger.Error("Cannot handle failover: peer with site ID %d not found", siteId) + return + } + + // Check for IPv6 and format the endpoint correctly + formattedEndpoint := relayEndpoint + if strings.Contains(relayEndpoint, ":") { + formattedEndpoint = fmt.Sprintf("[%s]", relayEndpoint) + } + + // Update only the endpoint for this peer (update_only preserves other settings) + wgConfig := fmt.Sprintf(`public_key=%s +update_only=true +endpoint=%s:21820`, peer.PublicKey, formattedEndpoint) + + err := pm.device.IpcSet(wgConfig) + if err != nil { + logger.Error("Failed to configure WireGuard device: %v\n", err) + return + } + + logger.Info("Adjusted peer %d to point to relay!\n", siteId) +}