From 51162d6be63c4886c2c59dcd35cab1f44f1f840e Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 1 Dec 2025 21:55:11 -0500 Subject: [PATCH] Further adjust structure to include peer monitor Former-commit-id: 5a2918b2a4284941ae19331730b3b6cb50d95012 --- olm/olm.go | 37 +++++--------------------------- peers/manager.go | 46 ++++++++++++++++++++++++++++------------ peers/monitor/monitor.go | 17 +++++++++++++-- peers/{wg.go => peer.go} | 0 4 files changed, 53 insertions(+), 47 deletions(-) rename peers/{wg.go => peer.go} (100%) diff --git a/olm/olm.go b/olm/olm.go index 6401984..ee36c29 100644 --- a/olm/olm.go +++ b/olm/olm.go @@ -407,28 +407,7 @@ func StartTunnel(config TunnelConfig) { LocalIP: interfaceIP, SharedBind: sharedBind, WSClient: wsClientForMonitor, - StatusCallback: func(siteID int, connected bool, rtt time.Duration) { - // Find the site config to get endpoint information - var endpoint string - var isRelay bool - for _, site := range wgData.Sites { - if site.SiteId == siteID { - if site.RelayEndpoint != "" { - endpoint = site.RelayEndpoint - } else { - endpoint = site.Endpoint - } - isRelay = site.RelayEndpoint != "" - break - } - } - apiServer.UpdatePeerStatus(siteID, connected, rtt, endpoint, isRelay) - if connected { - logger.Info("Peer %d is now connected (RTT: %v)", siteID, rtt) - } else { - logger.Warn("Peer %d is disconnected", siteID) - } - }, + APIServer: apiServer, }) for i := range wgData.Sites { @@ -450,14 +429,12 @@ func StartTunnel(config TunnelConfig) { logger.Info("Configured peer %s", site.PublicKey) } - peerManager.SetHolepunchStatusCallback(func(siteID int, endpoint string, connected bool, rtt time.Duration) { - // This callback is for additional handling if needed - // The PeerMonitor already logs status changes - logger.Info("+++++++++++++++++++++++++ holepunch monitor callback for site %d, endpoint %s, connected: %v, rtt: %v", siteID, endpoint, connected, rtt) - }) - peerManager.Start() + if err := dnsProxy.Start(); err != nil { // start DNS proxy first so there is no downtime + logger.Error("Failed to start DNS proxy: %v", err) + } + if config.OverrideDNS { // Set up DNS override to use our DNS proxy if err := dnsOverride.SetupDNSOverride(interfaceName, dnsProxy); err != nil { @@ -466,10 +443,6 @@ func StartTunnel(config TunnelConfig) { } } - if err := dnsProxy.Start(); err != nil { - logger.Error("Failed to start DNS proxy: %v", err) - } - apiServer.SetRegistered(true) connected = true diff --git a/peers/manager.go b/peers/manager.go index 12631b0..4cd8332 100644 --- a/peers/manager.go +++ b/peers/manager.go @@ -11,6 +11,7 @@ import ( "github.com/fosrl/newt/bind" "github.com/fosrl/newt/logger" "github.com/fosrl/newt/network" + "github.com/fosrl/olm/api" olmDevice "github.com/fosrl/olm/device" "github.com/fosrl/olm/dns" "github.com/fosrl/olm/peers/monitor" @@ -19,9 +20,6 @@ import ( "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ) -// PeerStatusCallback is called when a peer's connection status changes -type PeerStatusCallback func(siteID int, connected bool, rtt time.Duration) - // HolepunchStatusCallback is called when holepunch connection status changes // This is an alias for monitor.HolepunchStatusCallback type HolepunchStatusCallback = monitor.HolepunchStatusCallback @@ -37,9 +35,8 @@ type PeerManagerConfig struct { LocalIP string SharedBind *bind.SharedBind // WSClient is optional - if nil, relay messages won't be sent - WSClient *websocket.Client - // StatusCallback is called when peer connection status changes - StatusCallback PeerStatusCallback + WSClient *websocket.Client + APIServer *api.API } type PeerManager struct { @@ -56,8 +53,7 @@ type PeerManager struct { // allowedIPClaims tracks all peers that claim each allowed IP // key is the CIDR string, value is a set of siteIds that want this IP allowedIPClaims map[string]map[int]bool - // statusCallback is called when peer connection status changes - statusCallback PeerStatusCallback + APIServer *api.API } // NewPeerManager creates a new PeerManager with an internal PeerMonitor @@ -70,15 +66,37 @@ func NewPeerManager(config PeerManagerConfig) *PeerManager { privateKey: config.PrivateKey, allowedIPOwners: make(map[string]int), allowedIPClaims: make(map[string]map[int]bool), - statusCallback: config.StatusCallback, + APIServer: config.APIServer, } // Create the peer monitor pm.peerMonitor = monitor.NewPeerMonitor( func(siteID int, connected bool, rtt time.Duration) { - // Call the external status callback if set - if pm.statusCallback != nil { - pm.statusCallback(siteID, connected, rtt) + // Update API status directly + if pm.APIServer != nil { + // Find the peer config to get endpoint information + pm.mu.RLock() + peer, exists := pm.peers[siteID] + pm.mu.RUnlock() + + var endpoint string + var isRelay bool + if exists { + if peer.RelayEndpoint != "" { + endpoint = peer.RelayEndpoint + isRelay = true + } else { + endpoint = peer.Endpoint + isRelay = false + } + } + pm.APIServer.UpdatePeerStatus(siteID, connected, rtt, endpoint, isRelay) + } + + if connected { + logger.Info("Peer %d is now connected (RTT: %v)", siteID, rtt) + } else { + logger.Warn("Peer %d is disconnected", siteID) } }, config.WSClient, @@ -154,7 +172,7 @@ func (pm *PeerManager) AddPeer(siteConfig SiteConfig, endpoint string) error { monitorAddress := strings.Split(siteConfig.ServerIP, "/")[0] monitorPeer := net.JoinHostPort(monitorAddress, strconv.Itoa(int(siteConfig.ServerPort+1))) // +1 for the monitor port - err := pm.peerMonitor.AddPeer(siteConfig.SiteId, monitorPeer) + err := pm.peerMonitor.AddPeer(siteConfig.SiteId, monitorPeer, siteConfig.Endpoint) // always use the real site endpoint for hole punch monitoring if err != nil { logger.Warn("Failed to setup monitoring for site %d: %v", siteConfig.SiteId, err) } else { @@ -371,6 +389,8 @@ func (pm *PeerManager) UpdatePeer(siteConfig SiteConfig, endpoint string) error pm.dnsProxy.AddDNSRecord(alias.Alias, address) } + pm.peerMonitor.UpdateHolepunchEndpoint(siteConfig.SiteId, siteConfig.Endpoint) + pm.peers[siteConfig.SiteId] = siteConfig return nil } diff --git a/peers/monitor/monitor.go b/peers/monitor/monitor.go index 9a02408..d7055d2 100644 --- a/peers/monitor/monitor.go +++ b/peers/monitor/monitor.go @@ -138,7 +138,7 @@ func (pm *PeerMonitor) SetMaxAttempts(attempts int) { } // AddPeer adds a new peer to monitor -func (pm *PeerMonitor) AddPeer(siteID int, endpoint string) error { +func (pm *PeerMonitor) AddPeer(siteID int, endpoint string, holepunchEndpoint string) error { pm.mutex.Lock() defer pm.mutex.Unlock() @@ -157,7 +157,8 @@ func (pm *PeerMonitor) AddPeer(siteID int, endpoint string) error { client.SetMaxAttempts(pm.maxAttempts) pm.monitors[siteID] = client - pm.holepunchEndpoints[siteID] = endpoint + + pm.holepunchEndpoints[siteID] = holepunchEndpoint pm.holepunchStatus[siteID] = false // Initially unknown/disconnected if pm.running { @@ -171,6 +172,14 @@ func (pm *PeerMonitor) AddPeer(siteID int, endpoint string) error { return nil } +// update holepunch endpoint for a peer +func (pm *PeerMonitor) UpdateHolepunchEndpoint(siteID int, endpoint string) { + pm.mutex.Lock() + defer pm.mutex.Unlock() + + pm.holepunchEndpoints[siteID] = endpoint +} + // removePeerUnlocked stops monitoring a peer and removes it from the monitor // This function assumes the mutex is already held by the caller func (pm *PeerMonitor) removePeerUnlocked(siteID int) { @@ -189,6 +198,10 @@ func (pm *PeerMonitor) RemovePeer(siteID int) { pm.mutex.Lock() defer pm.mutex.Unlock() + // remove the holepunch endpoint info + delete(pm.holepunchEndpoints, siteID) + delete(pm.holepunchStatus, siteID) + pm.removePeerUnlocked(siteID) } diff --git a/peers/wg.go b/peers/peer.go similarity index 100% rename from peers/wg.go rename to peers/peer.go