From 7d83518951a93224e9f5a68f8a2d698de6997c2f Mon Sep 17 00:00:00 2001 From: Owen Date: Fri, 20 Mar 2026 17:28:39 -0700 Subject: [PATCH] Get peer manager --- olm/connect.go | 5 +++-- olm/data.go | 49 ++++++++++++++++++++++++++++++++++--------------- olm/olm.go | 24 ++++++++++++++++++------ olm/peer.go | 38 ++++++++++++++++++++++++-------------- 4 files changed, 79 insertions(+), 37 deletions(-) diff --git a/olm/connect.go b/olm/connect.go index 3a2000c..9f80ce3 100644 --- a/olm/connect.go +++ b/olm/connect.go @@ -205,12 +205,13 @@ func (o *Olm) handleConnect(msg websocket.WSMessage) { // Register JIT handler: when the DNS proxy resolves a local record, check whether // the owning site is already connected and, if not, initiate a JIT connection. o.dnsProxy.SetJITHandler(func(siteId int) { - if o.peerManager == nil || o.websocket == nil { + pm := o.getPeerManager() + if pm == nil || o.websocket == nil { return } // Site already has an active peer connection - nothing to do. - if _, exists := o.peerManager.GetPeer(siteId); exists { + if _, exists := pm.GetPeer(siteId); exists { return } diff --git a/olm/data.go b/olm/data.go index d0e6d5b..5e6299a 100644 --- a/olm/data.go +++ b/olm/data.go @@ -32,21 +32,27 @@ func (o *Olm) handleWgPeerAddData(msg websocket.WSMessage) { return } - if _, exists := o.peerManager.GetPeer(addSubnetsData.SiteId); !exists { + pm := o.getPeerManager() + if pm == nil { + logger.Debug("Ignoring add-remote-subnets-aliases message: peerManager is nil (shutdown in progress)") + return + } + + if _, exists := pm.GetPeer(addSubnetsData.SiteId); !exists { logger.Debug("Peer %d not found for removing remote subnets and aliases", addSubnetsData.SiteId) return } // Add new subnets for _, subnet := range addSubnetsData.RemoteSubnets { - if err := o.peerManager.AddRemoteSubnet(addSubnetsData.SiteId, subnet); err != nil { + if err := pm.AddRemoteSubnet(addSubnetsData.SiteId, subnet); err != nil { logger.Error("Failed to add allowed IP %s: %v", subnet, err) } } // Add new aliases for _, alias := range addSubnetsData.Aliases { - if err := o.peerManager.AddAlias(addSubnetsData.SiteId, alias); err != nil { + if err := pm.AddAlias(addSubnetsData.SiteId, alias); err != nil { logger.Error("Failed to add alias %s: %v", alias.Alias, err) } } @@ -73,21 +79,27 @@ func (o *Olm) handleWgPeerRemoveData(msg websocket.WSMessage) { return } - if _, exists := o.peerManager.GetPeer(removeSubnetsData.SiteId); !exists { + pm := o.getPeerManager() + if pm == nil { + logger.Debug("Ignoring remove-remote-subnets-aliases message: peerManager is nil (shutdown in progress)") + return + } + + if _, exists := pm.GetPeer(removeSubnetsData.SiteId); !exists { logger.Debug("Peer %d not found for removing remote subnets and aliases", removeSubnetsData.SiteId) return } // Remove subnets for _, subnet := range removeSubnetsData.RemoteSubnets { - if err := o.peerManager.RemoveRemoteSubnet(removeSubnetsData.SiteId, subnet); err != nil { + if err := pm.RemoveRemoteSubnet(removeSubnetsData.SiteId, subnet); err != nil { logger.Error("Failed to remove allowed IP %s: %v", subnet, err) } } // Remove aliases for _, alias := range removeSubnetsData.Aliases { - if err := o.peerManager.RemoveAlias(removeSubnetsData.SiteId, alias.Alias); err != nil { + if err := pm.RemoveAlias(removeSubnetsData.SiteId, alias.Alias); err != nil { logger.Error("Failed to remove alias %s: %v", alias.Alias, err) } } @@ -114,7 +126,13 @@ func (o *Olm) handleWgPeerUpdateData(msg websocket.WSMessage) { return } - if _, exists := o.peerManager.GetPeer(updateSubnetsData.SiteId); !exists { + pm := o.getPeerManager() + if pm == nil { + logger.Debug("Ignoring update-remote-subnets-aliases message: peerManager is nil (shutdown in progress)") + return + } + + if _, exists := pm.GetPeer(updateSubnetsData.SiteId); !exists { logger.Debug("Peer %d not found for updating remote subnets and aliases", updateSubnetsData.SiteId) return } @@ -123,14 +141,14 @@ func (o *Olm) handleWgPeerUpdateData(msg websocket.WSMessage) { // This ensures that if an old and new subnet are the same on different peers, // the route won't be temporarily removed for _, subnet := range updateSubnetsData.NewRemoteSubnets { - if err := o.peerManager.AddRemoteSubnet(updateSubnetsData.SiteId, subnet); err != nil { + if err := pm.AddRemoteSubnet(updateSubnetsData.SiteId, subnet); err != nil { logger.Error("Failed to add allowed IP %s: %v", subnet, err) } } // Remove old subnets after new ones are added for _, subnet := range updateSubnetsData.OldRemoteSubnets { - if err := o.peerManager.RemoveRemoteSubnet(updateSubnetsData.SiteId, subnet); err != nil { + if err := pm.RemoveRemoteSubnet(updateSubnetsData.SiteId, subnet); err != nil { logger.Error("Failed to remove allowed IP %s: %v", subnet, err) } } @@ -139,14 +157,14 @@ func (o *Olm) handleWgPeerUpdateData(msg websocket.WSMessage) { // This ensures that if an old and new alias share the same IP, the IP won't be // temporarily removed from the allowed IPs list for _, alias := range updateSubnetsData.NewAliases { - if err := o.peerManager.AddAlias(updateSubnetsData.SiteId, alias); err != nil { + if err := pm.AddAlias(updateSubnetsData.SiteId, alias); err != nil { logger.Error("Failed to add alias %s: %v", alias.Alias, err) } } // Remove old aliases after new ones are added for _, alias := range updateSubnetsData.OldAliases { - if err := o.peerManager.RemoveAlias(updateSubnetsData.SiteId, alias.Alias); err != nil { + if err := pm.RemoveAlias(updateSubnetsData.SiteId, alias.Alias); err != nil { logger.Error("Failed to remove alias %s: %v", alias.Alias, err) } } @@ -163,7 +181,8 @@ func (o *Olm) handleSync(msg websocket.WSMessage) { return } - if o.peerManager == nil { + pm := o.getPeerManager() + if pm == nil { logger.Warn("Peer manager not initialized, ignoring sync request") return } @@ -190,7 +209,7 @@ func (o *Olm) handleSync(msg websocket.WSMessage) { } // Get all current peers - currentPeers := o.peerManager.GetAllPeers() + currentPeers := pm.GetAllPeers() currentPeerMap := make(map[int]peers.SiteConfig) for _, peer := range currentPeers { currentPeerMap[peer.SiteId] = peer @@ -200,7 +219,7 @@ func (o *Olm) handleSync(msg websocket.WSMessage) { for siteId := range currentPeerMap { if _, exists := expectedPeers[siteId]; !exists { logger.Info("Sync: Removing peer for site %d (no longer in expected config)", siteId) - if err := o.peerManager.RemovePeer(siteId); err != nil { + if err := pm.RemovePeer(siteId); err != nil { logger.Error("Sync: Failed to remove peer %d: %v", siteId, err) } else { // Remove any exit nodes associated with this peer from hole punching @@ -301,7 +320,7 @@ func (o *Olm) handleSync(msg websocket.WSMessage) { siteConfig.Aliases = expectedSite.Aliases } - if err := o.peerManager.UpdatePeer(siteConfig); err != nil { + if err := pm.UpdatePeer(siteConfig); err != nil { logger.Error("Sync: Failed to update peer %d: %v", siteId, err) } else { // If the endpoint changed, trigger holepunch to refresh NAT mappings diff --git a/olm/olm.go b/olm/olm.go index 5705ddc..07d144f 100644 --- a/olm/olm.go +++ b/olm/olm.go @@ -47,6 +47,7 @@ type Olm struct { websocket *websocket.Client holePunchManager *holepunch.Manager peerManager *peers.PeerManager + peerManagerMu sync.RWMutex // Power mode management currentPowerMode string powerModeMu sync.Mutex @@ -76,6 +77,15 @@ type Olm struct { tunnelWg sync.WaitGroup } +// getPeerManager safely returns the current peerManager under a read-lock. +// Callers must check the returned value for nil before using it. +func (o *Olm) getPeerManager() *peers.PeerManager { + o.peerManagerMu.RLock() + pm := o.peerManager + o.peerManagerMu.RUnlock() + return pm +} + // initTunnelInfo creates the shared UDP socket and holepunch manager. // This is used during initial tunnel setup and when switching organizations. func (o *Olm) initTunnelInfo(clientID string) error { @@ -591,10 +601,12 @@ func (o *Olm) Close() { } // Close() also calls Stop() internally + o.peerManagerMu.Lock() if o.peerManager != nil { o.peerManager.Close() o.peerManager = nil } + o.peerManagerMu.Unlock() if o.uapiListener != nil { _ = o.uapiListener.Close() @@ -806,14 +818,14 @@ func (o *Olm) SetPowerMode(mode string) error { lowPowerInterval := 10 * time.Minute - if o.peerManager != nil { - peerMonitor := o.peerManager.GetPeerMonitor() + if pm := o.getPeerManager(); pm != nil { + peerMonitor := pm.GetPeerMonitor() if peerMonitor != nil { peerMonitor.SetPeerInterval(lowPowerInterval, lowPowerInterval) peerMonitor.SetPeerHolepunchInterval(lowPowerInterval, lowPowerInterval) logger.Info("Set monitoring intervals to 10 minutes for low power mode") } - o.peerManager.UpdateAllPeersPersistentKeepalive(0) // disable + pm.UpdateAllPeersPersistentKeepalive(0) // disable } if o.holePunchManager != nil { @@ -858,14 +870,14 @@ func (o *Olm) SetPowerMode(mode string) error { } // Restore intervals and reconnect websocket - if o.peerManager != nil { - peerMonitor := o.peerManager.GetPeerMonitor() + if pm := o.getPeerManager(); pm != nil { + peerMonitor := pm.GetPeerMonitor() if peerMonitor != nil { peerMonitor.ResetPeerHolepunchInterval() peerMonitor.ResetPeerInterval() } - o.peerManager.UpdateAllPeersPersistentKeepalive(5) + pm.UpdateAllPeersPersistentKeepalive(5) } if o.holePunchManager != nil { diff --git a/olm/peer.go b/olm/peer.go index 7ec3b95..9c1c165 100644 --- a/olm/peer.go +++ b/olm/peer.go @@ -20,7 +20,8 @@ func (o *Olm) handleWgPeerAdd(msg websocket.WSMessage) { return } - if o.peerManager == nil { + pm := o.getPeerManager() + if pm == nil { logger.Debug("Ignoring add-peer message: peerManager is nil (shutdown in progress)") return } @@ -64,7 +65,7 @@ func (o *Olm) handleWgPeerAdd(msg websocket.WSMessage) { _ = o.holePunchManager.TriggerHolePunch() // Trigger immediate hole punch attempt so that if the peer decides to relay we have already punched close to when we need it - if err := o.peerManager.AddPeer(siteConfigMsg.SiteConfig); err != nil { + if err := pm.AddPeer(siteConfigMsg.SiteConfig); err != nil { logger.Error("Failed to add peer: %v", err) return } @@ -81,7 +82,8 @@ func (o *Olm) handleWgPeerRemove(msg websocket.WSMessage) { return } - if o.peerManager == nil { + pm := o.getPeerManager() + if pm == nil { logger.Debug("Ignoring remove-peer message: peerManager is nil (shutdown in progress)") return } @@ -98,7 +100,7 @@ func (o *Olm) handleWgPeerRemove(msg websocket.WSMessage) { return } - if err := o.peerManager.RemovePeer(removeData.SiteId); err != nil { + if err := pm.RemovePeer(removeData.SiteId); err != nil { logger.Error("Failed to remove peer: %v", err) return } @@ -123,7 +125,8 @@ func (o *Olm) handleWgPeerUpdate(msg websocket.WSMessage) { return } - if o.peerManager == nil { + pm := o.getPeerManager() + if pm == nil { logger.Debug("Ignoring update-peer message: peerManager is nil (shutdown in progress)") return } @@ -141,7 +144,7 @@ func (o *Olm) handleWgPeerUpdate(msg websocket.WSMessage) { } // Get existing peer from PeerManager - existingPeer, exists := o.peerManager.GetPeer(updateData.SiteId) + existingPeer, exists := pm.GetPeer(updateData.SiteId) if !exists { logger.Warn("Peer with site ID %d not found", updateData.SiteId) return @@ -169,7 +172,7 @@ func (o *Olm) handleWgPeerUpdate(msg websocket.WSMessage) { siteConfig.RemoteSubnets = updateData.RemoteSubnets } - if err := o.peerManager.UpdatePeer(siteConfig); err != nil { + if err := pm.UpdatePeer(siteConfig); err != nil { logger.Error("Failed to update peer: %v", err) return } @@ -188,7 +191,8 @@ func (o *Olm) handleWgPeerRelay(msg websocket.WSMessage) { logger.Debug("Received relay-peer message: %v", msg.Data) // Check if peerManager is still valid (may be nil during shutdown) - if o.peerManager == nil { + pm := o.getPeerManager() + if pm == nil { logger.Debug("Ignoring relay message: peerManager is nil (shutdown in progress)") return } @@ -208,7 +212,7 @@ func (o *Olm) handleWgPeerRelay(msg websocket.WSMessage) { return } - if monitor := o.peerManager.GetPeerMonitor(); monitor != nil { + if monitor := pm.GetPeerMonitor(); monitor != nil { monitor.CancelRelaySend(relayData.ChainId) } @@ -222,14 +226,15 @@ func (o *Olm) handleWgPeerRelay(msg websocket.WSMessage) { // Update HTTP server to mark this peer as using relay o.apiServer.UpdatePeerRelayStatus(relayData.SiteId, relayData.RelayEndpoint, true) - o.peerManager.RelayPeer(relayData.SiteId, primaryRelay, relayData.RelayPort) + pm.RelayPeer(relayData.SiteId, primaryRelay, relayData.RelayPort) } func (o *Olm) handleWgPeerUnrelay(msg websocket.WSMessage) { logger.Debug("Received unrelay-peer message: %v", msg.Data) // Check if peerManager is still valid (may be nil during shutdown) - if o.peerManager == nil { + pm := o.getPeerManager() + if pm == nil { logger.Debug("Ignoring unrelay message: peerManager is nil (shutdown in progress)") return } @@ -249,7 +254,7 @@ func (o *Olm) handleWgPeerUnrelay(msg websocket.WSMessage) { return } - if monitor := o.peerManager.GetPeerMonitor(); monitor != nil { + if monitor := pm.GetPeerMonitor(); monitor != nil { monitor.CancelRelaySend(relayData.ChainId) } @@ -262,7 +267,7 @@ func (o *Olm) handleWgPeerUnrelay(msg websocket.WSMessage) { // Update HTTP server to mark this peer as using relay o.apiServer.UpdatePeerRelayStatus(relayData.SiteId, relayData.Endpoint, false) - o.peerManager.UnRelayPeer(relayData.SiteId, primaryRelay) + pm.UnRelayPeer(relayData.SiteId, primaryRelay) } func (o *Olm) handleWgPeerHolepunchAddSite(msg websocket.WSMessage) { @@ -317,7 +322,12 @@ func (o *Olm) handleWgPeerHolepunchAddSite(msg websocket.WSMessage) { } // Get existing peer from PeerManager - _, exists := o.peerManager.GetPeer(handshakeData.SiteId) + pm := o.getPeerManager() + if pm == nil { + logger.Debug("Ignoring peer-handshake message: peerManager is nil (shutdown in progress)") + return + } + _, exists := pm.GetPeer(handshakeData.SiteId) if exists { logger.Warn("Peer with site ID %d already added", handshakeData.SiteId) return