From e898d4454f7a3b8f96a2740f919fbae952a8e618 Mon Sep 17 00:00:00 2001 From: Owen Date: Wed, 3 Dec 2025 15:14:08 -0500 Subject: [PATCH] Fixing small things --- api/api.go | 6 ++++++ main.go | 15 ++++++++------- olm/olm.go | 19 +++++++++++++++++++ peers/manager.go | 9 +++++---- peers/monitor/monitor.go | 23 +++++++++++++++++++++++ peers/monitor/wgtester.go | 18 ++++++++++++++++-- websocket/client.go | 2 -- 7 files changed, 77 insertions(+), 15 deletions(-) diff --git a/api/api.go b/api/api.go index f6c9f84..eb1c6a6 100644 --- a/api/api.go +++ b/api/api.go @@ -190,6 +190,12 @@ func (s *API) UpdatePeerStatus(siteID int, connected bool, rtt time.Duration, en status.IsRelay = isRelay } +func (s *API) RemovePeerStatus(siteID int) { // remove the peer from the status map + s.statusMu.Lock() + defer s.statusMu.Unlock() + delete(s.peerStatuses, siteID) +} + // SetConnectionStatus sets the overall connection status func (s *API) SetConnectionStatus(isConnected bool) { s.statusMu.Lock() diff --git a/main.go b/main.go index 170a976..c4c89db 100644 --- a/main.go +++ b/main.go @@ -210,13 +210,14 @@ func runOlmMainWithArgs(ctx context.Context, cancel context.CancelFunc, signalCt // Create a new olm.Config struct and copy values from the main config olmConfig := olm.GlobalConfig{ - LogLevel: config.LogLevel, - EnableAPI: config.EnableAPI, - HTTPAddr: config.HTTPAddr, - SocketPath: config.SocketPath, - Version: config.Version, - Agent: "olm-cli", - OnExit: cancel, // Pass cancel function directly to trigger shutdown + LogLevel: config.LogLevel, + EnableAPI: config.EnableAPI, + HTTPAddr: config.HTTPAddr, + SocketPath: config.SocketPath, + Version: config.Version, + Agent: "olm-cli", + OnExit: cancel, // Pass cancel function directly to trigger shutdown + OnTerminated: cancel, } olm.Init(ctx, olmConfig) diff --git a/olm/olm.go b/olm/olm.go index 67c6880..7b9b9e1 100644 --- a/olm/olm.go +++ b/olm/olm.go @@ -577,6 +577,11 @@ func StartTunnel(config TunnelConfig) { return } + if _, exists := peerManager.GetPeer(addSubnetsData.SiteId); !exists { + logger.Debug("Peer %d not found for removing remote subnets and aliases", addSubnetsData.SiteId) + return + } + // Add new subnets for _, subnet := range addSubnetsData.RemoteSubnets { if err := peerManager.AddRemoteSubnet(addSubnetsData.SiteId, subnet); err != nil { @@ -608,6 +613,11 @@ func StartTunnel(config TunnelConfig) { return } + if _, exists := peerManager.GetPeer(removeSubnetsData.SiteId); !exists { + logger.Debug("Peer %d not found for removing remote subnets and aliases", removeSubnetsData.SiteId) + return + } + // Remove subnets for _, subnet := range removeSubnetsData.RemoteSubnets { if err := peerManager.RemoveRemoteSubnet(removeSubnetsData.SiteId, subnet); err != nil { @@ -639,6 +649,11 @@ func StartTunnel(config TunnelConfig) { return } + if _, exists := peerManager.GetPeer(updateSubnetsData.SiteId); !exists { + logger.Debug("Peer %d not found for removing remote subnets and aliases", updateSubnetsData.SiteId) + return + } + // Remove old subnets for _, subnet := range updateSubnetsData.OldRemoteSubnets { if err := peerManager.RemoveRemoteSubnet(updateSubnetsData.SiteId, subnet); err != nil { @@ -801,6 +816,10 @@ func StartTunnel(config TunnelConfig) { } }) + olm.RegisterHandler("pong", func(msg websocket.WSMessage) { + logger.Debug("Received pong message") + }) + olm.OnConnect(func() error { logger.Info("Websocket Connected") diff --git a/peers/manager.go b/peers/manager.go index 79a2e9d..f704f25 100644 --- a/peers/manager.go +++ b/peers/manager.go @@ -221,6 +221,8 @@ func (pm *PeerManager) RemovePeer(siteId int) error { pm.peerMonitor.RemovePeer(siteId) logger.Info("Stopped monitoring for site %d", siteId) + pm.APIServer.RemovePeerStatus(siteId) + delete(pm.peers, siteId) return nil } @@ -360,10 +362,9 @@ func (pm *PeerManager) UpdatePeer(siteConfig SiteConfig) error { pm.peerMonitor.UpdateHolepunchEndpoint(siteConfig.SiteId, siteConfig.Endpoint) - // Preserve the relay endpoint if the peer is relayed - if pm.peerMonitor != nil && pm.peerMonitor.IsPeerRelayed(siteConfig.SiteId) && oldPeer.RelayEndpoint != "" { - siteConfig.RelayEndpoint = oldPeer.RelayEndpoint - } + monitorAddress := strings.Split(siteConfig.ServerIP, "/")[0] + monitorPeer := net.JoinHostPort(monitorAddress, strconv.Itoa(int(siteConfig.ServerPort+1))) // +1 for the monitor port + pm.peerMonitor.UpdatePeerEndpoint(siteConfig.SiteId, monitorPeer) // +1 for monitor port pm.peers[siteConfig.SiteId] = siteConfig return nil diff --git a/peers/monitor/monitor.go b/peers/monitor/monitor.go index d2e1094..215ca72 100644 --- a/peers/monitor/monitor.go +++ b/peers/monitor/monitor.go @@ -188,6 +188,23 @@ func (pm *PeerMonitor) UpdateHolepunchEndpoint(siteID int, endpoint string) { pm.holepunchEndpoints[siteID] = endpoint } +// UpdatePeerEndpoint updates the monitor endpoint for a peer +func (pm *PeerMonitor) UpdatePeerEndpoint(siteID int, monitorPeer string) { + pm.mutex.Lock() + defer pm.mutex.Unlock() + + client, exists := pm.monitors[siteID] + if !exists { + logger.Warn("Cannot update endpoint: peer %d not found in monitor", siteID) + return + } + + // Update the client's server address + client.UpdateServerAddr(monitorPeer) + + logger.Info("Updated monitor endpoint for site %d to %s", siteID, monitorPeer) +} + // removePeerUnlocked stops monitoring a peer and removes it from the monitor // This function assumes the mutex is already held by the caller func (pm *PeerMonitor) removePeerUnlocked(siteID int) { @@ -417,6 +434,12 @@ func (pm *PeerMonitor) checkHolepunchEndpoints() { result := pm.holepunchTester.TestEndpoint(endpoint, timeout) pm.mutex.Lock() + // Check if peer was removed while we were testing + if _, stillExists := pm.holepunchEndpoints[siteID]; !stillExists { + pm.mutex.Unlock() + continue // Peer was removed, skip processing + } + previousStatus, exists := pm.holepunchStatus[siteID] pm.holepunchStatus[siteID] = result.Success isRelayed := pm.relayedPeers[siteID] diff --git a/peers/monitor/wgtester.go b/peers/monitor/wgtester.go index 15bf025..6204620 100644 --- a/peers/monitor/wgtester.go +++ b/peers/monitor/wgtester.go @@ -74,6 +74,20 @@ func (c *Client) SetMaxAttempts(attempts int) { c.maxAttempts = attempts } +// UpdateServerAddr updates the server address and resets the connection +func (c *Client) UpdateServerAddr(serverAddr string) { + c.connLock.Lock() + defer c.connLock.Unlock() + + // Close existing connection if any + if c.conn != nil { + c.conn.Close() + c.conn = nil + } + + c.serverAddr = serverAddr +} + // Close cleans up client resources func (c *Client) Close() { c.StopMonitor() @@ -143,14 +157,14 @@ func (c *Client) TestConnection(ctx context.Context) (bool, time.Duration) { return false, 0 } - // logger.Debug("Attempting to send monitor packet to %s", c.serverAddr) + logger.Debug("Attempting to send monitor packet to %s", c.serverAddr) _, err := c.conn.Write(packet) if err != nil { c.connLock.Unlock() logger.Info("Error sending packet: %v", err) continue } - // logger.Debug("Successfully sent monitor packet") + logger.Debug("Successfully sent monitor packet") // Set read deadline c.conn.SetReadDeadline(time.Now().Add(c.timeout)) diff --git a/websocket/client.go b/websocket/client.go index 6c198bf..54b659a 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -646,9 +646,7 @@ func (c *Client) readPumpWithDisconnectDetection() { c.handlersMux.RLock() if handler, ok := c.handlers[msg.Type]; ok { - logger.Debug("***********************************Running handler for message type: %s", msg.Type) handler(msg) - logger.Debug("***********************************Finished handler for message type: %s", msg.Type) } c.handlersMux.RUnlock() }