Move failover command to monitor

This commit is contained in:
Owen
2025-12-01 20:21:05 -05:00
parent e5977013b0
commit 23e7b173c9
4 changed files with 51 additions and 64 deletions

View File

@@ -392,6 +392,12 @@ func StartTunnel(config TunnelConfig) {
interfaceIP = strings.Split(interfaceIP, "/")[0]
}
// Determine if we should send relay messages (only when holepunching is enabled and relay is not disabled)
var wsClientForMonitor *websocket.Client
if config.Holepunch && !config.DisableRelay {
wsClientForMonitor = olm
}
peerMonitor = peermonitor.NewPeerMonitor(
func(siteID int, connected bool, rtt time.Duration) {
// Find the site config to get endpoint information
@@ -413,10 +419,7 @@ func StartTunnel(config TunnelConfig) {
logger.Warn("Peer %d is disconnected", siteID)
}
},
util.FixKey(privateKey.String()),
olm,
dev,
config.Holepunch && !config.DisableRelay, // Enable relay only if holepunching is enabled and DisableRelay is false
wsClientForMonitor,
middleDev,
interfaceIP,
sharedBind, // Pass sharedBind for holepunch testing
@@ -710,7 +713,7 @@ func StartTunnel(config TunnelConfig) {
// Update HTTP server to mark this peer as using relay
apiServer.UpdatePeerRelayStatus(relayData.SiteId, relayData.Endpoint, true)
peerMonitor.HandleFailover(relayData.SiteId, primaryRelay)
peerManager.HandleFailover(relayData.SiteId, primaryRelay)
})
// Handler for peer handshake - adds exit node to holepunch rotation and notifies server

View File

@@ -27,9 +27,6 @@ type GlobalConfig struct {
OnConnected func()
OnTerminated func()
OnAuthError func(statusCode int, message string) // Called when auth fails (401/403)
// Source tracking (not in JSON)
sources map[string]string
}
type TunnelConfig struct {

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"net"
"net/netip"
"strings"
"sync"
"time"
@@ -15,7 +14,6 @@ import (
"github.com/fosrl/newt/util"
middleDevice "github.com/fosrl/olm/device"
"github.com/fosrl/olm/websocket"
"golang.zx2c4.com/wireguard/device"
"gvisor.dev/gvisor/pkg/buffer"
"gvisor.dev/gvisor/pkg/tcpip"
"gvisor.dev/gvisor/pkg/tcpip/adapters/gonet"
@@ -44,18 +42,15 @@ type WireGuardConfig struct {
// PeerMonitor handles monitoring the connection status to multiple WireGuard peers
type PeerMonitor struct {
monitors map[int]*Client
configs map[int]*WireGuardConfig
callback PeerMonitorCallback
mutex sync.Mutex
running bool
interval time.Duration
timeout time.Duration
maxAttempts int
privateKey string
wsClient *websocket.Client
device *device.Device
handleRelaySwitch bool // Whether to handle relay switching
monitors map[int]*Client
configs map[int]*WireGuardConfig
callback PeerMonitorCallback
mutex sync.Mutex
running bool
interval time.Duration
timeout time.Duration
maxAttempts int
wsClient *websocket.Client
// Netstack fields
middleDev *middleDevice.MiddleDevice
@@ -80,7 +75,7 @@ type PeerMonitor struct {
}
// NewPeerMonitor creates a new peer monitor with the given callback
func NewPeerMonitor(callback PeerMonitorCallback, privateKey string, wsClient *websocket.Client, device *device.Device, handleRelaySwitch bool, middleDev *middleDevice.MiddleDevice, localIP string, sharedBind *bind.SharedBind) *PeerMonitor {
func NewPeerMonitor(callback PeerMonitorCallback, wsClient *websocket.Client, middleDev *middleDevice.MiddleDevice, localIP string, sharedBind *bind.SharedBind) *PeerMonitor {
ctx, cancel := context.WithCancel(context.Background())
pm := &PeerMonitor{
monitors: make(map[int]*Client),
@@ -89,10 +84,7 @@ func NewPeerMonitor(callback PeerMonitorCallback, privateKey string, wsClient *w
interval: 1 * time.Second, // Default check interval
timeout: 2500 * time.Millisecond,
maxAttempts: 15,
privateKey: privateKey,
wsClient: wsClient,
device: device,
handleRelaySwitch: handleRelaySwitch,
middleDev: middleDev,
localIP: localIP,
activePorts: make(map[uint16]bool),
@@ -245,53 +237,16 @@ func (pm *PeerMonitor) handleConnectionStatusChange(siteID int, status Connectio
pm.callback(siteID, status.Connected, status.RTT)
}
// If disconnected, handle failover
// If disconnected, send relay message to the server
if !status.Connected {
// Send relay message to the server
if pm.wsClient != nil {
pm.sendRelay(siteID)
}
}
}
// handleFailover handles failover to the relay server when a peer is disconnected
func (pm *PeerMonitor) HandleFailover(siteID int, relayEndpoint string) {
pm.mutex.Lock()
config, exists := pm.configs[siteID]
pm.mutex.Unlock()
if !exists {
return
}
// Check for IPv6 and format the endpoint correctly
formattedEndpoint := relayEndpoint
if strings.Contains(relayEndpoint, ":") {
formattedEndpoint = fmt.Sprintf("[%s]", relayEndpoint)
}
// Configure WireGuard to use the relay
wgConfig := fmt.Sprintf(`private_key=%s
public_key=%s
allowed_ip=%s/32
endpoint=%s:21820
persistent_keepalive_interval=1`, pm.privateKey, config.PublicKey, config.ServerIP, formattedEndpoint)
err := pm.device.IpcSet(wgConfig)
if err != nil {
logger.Error("Failed to configure WireGuard device: %v\n", err)
return
}
logger.Info("Adjusted peer %d to point to relay!\n", siteID)
}
// sendRelay sends a relay message to the server
func (pm *PeerMonitor) sendRelay(siteID int) error {
if !pm.handleRelaySwitch {
return nil
}
if pm.wsClient == nil {
return fmt.Errorf("websocket client is nil")
}

View File

@@ -3,6 +3,7 @@ package peers
import (
"fmt"
"net"
"strings"
"sync"
"github.com/fosrl/newt/logger"
@@ -594,3 +595,34 @@ func (pm *PeerManager) RemoveAlias(siteId int, aliasName string) error {
return nil
}
// HandleFailover handles failover to the relay server when a peer is disconnected
func (pm *PeerManager) HandleFailover(siteId int, relayEndpoint string) {
pm.mu.RLock()
peer, exists := pm.peers[siteId]
pm.mu.RUnlock()
if !exists {
logger.Error("Cannot handle failover: peer with site ID %d not found", siteId)
return
}
// Check for IPv6 and format the endpoint correctly
formattedEndpoint := relayEndpoint
if strings.Contains(relayEndpoint, ":") {
formattedEndpoint = fmt.Sprintf("[%s]", relayEndpoint)
}
// Update only the endpoint for this peer (update_only preserves other settings)
wgConfig := fmt.Sprintf(`public_key=%s
update_only=true
endpoint=%s:21820`, peer.PublicKey, formattedEndpoint)
err := pm.device.IpcSet(wgConfig)
if err != nil {
logger.Error("Failed to configure WireGuard device: %v\n", err)
return
}
logger.Info("Adjusted peer %d to point to relay!\n", siteId)
}