From 809dbe77de0042d038f86b2a5fbedb53a3af8e61 Mon Sep 17 00:00:00 2001 From: Owen Date: Fri, 6 Mar 2026 15:27:03 -0800 Subject: [PATCH] Make chainId in relay message bckwd compat --- olm/peer.go | 21 ++++++++------------- peers/monitor/monitor.go | 34 +++++++++++++++++++++++----------- 2 files changed, 31 insertions(+), 24 deletions(-) diff --git a/olm/peer.go b/olm/peer.go index c611921..9f02bb2 100644 --- a/olm/peer.go +++ b/olm/peer.go @@ -51,7 +51,6 @@ func (o *Olm) handleWgPeerAdd(msg websocket.WSMessage) { return } - logger.Info("Successfully added peer for site %d", siteConfigMsg.SiteId) } @@ -181,10 +180,8 @@ func (o *Olm) handleWgPeerRelay(msg websocket.WSMessage) { return } - if relayData.ChainId != "" { - if monitor := o.peerManager.GetPeerMonitor(); monitor != nil { - monitor.CancelRelaySend(relayData.ChainId) - } + if monitor := o.peerManager.GetPeerMonitor(); monitor != nil { + monitor.CancelRelaySend(relayData.ChainId) } primaryRelay, err := util.ResolveDomain(relayData.RelayEndpoint) @@ -223,10 +220,8 @@ func (o *Olm) handleWgPeerUnrelay(msg websocket.WSMessage) { return } - if relayData.ChainId != "" { - if monitor := o.peerManager.GetPeerMonitor(); monitor != nil { - monitor.CancelRelaySend(relayData.ChainId) - } + if monitor := o.peerManager.GetPeerMonitor(); monitor != nil { + monitor.CancelRelaySend(relayData.ChainId) } primaryRelay, err := util.ResolveDomain(relayData.Endpoint) @@ -256,8 +251,8 @@ func (o *Olm) handleWgPeerHolepunchAddSite(msg websocket.WSMessage) { } var handshakeData struct { - SiteId int `json:"siteId"` - ChainId string `json:"chainId"` + SiteId int `json:"siteId"` + ChainId string `json:"chainId"` ExitNode struct { PublicKey string `json:"publicKey"` Endpoint string `json:"endpoint"` @@ -269,7 +264,7 @@ func (o *Olm) handleWgPeerHolepunchAddSite(msg websocket.WSMessage) { logger.Error("Error unmarshaling handshake data: %v", err) return } - + // Stop the peer init sender for this chain, if any if handshakeData.ChainId != "" { o.peerSendMu.Lock() @@ -278,7 +273,7 @@ func (o *Olm) handleWgPeerHolepunchAddSite(msg websocket.WSMessage) { delete(o.stopPeerInits, handshakeData.ChainId) } o.peerSendMu.Unlock() - } + } // Get existing peer from PeerManager _, exists := o.peerManager.GetPeer(handshakeData.SiteId) diff --git a/peers/monitor/monitor.go b/peers/monitor/monitor.go index 1296fef..6b0d557 100644 --- a/peers/monitor/monitor.go +++ b/peers/monitor/monitor.go @@ -33,13 +33,13 @@ type PeerMonitor struct { monitors map[int]*Client mutex sync.Mutex running bool - timeout time.Duration + timeout time.Duration maxAttempts int wsClient *websocket.Client // Relay sender tracking - relaySends map[string]func() - relaySendMu sync.Mutex + relaySends map[string]func() + relaySendMu sync.Mutex // Netstack fields middleDev *middleDevice.MiddleDevice @@ -53,13 +53,13 @@ type PeerMonitor struct { nsWg sync.WaitGroup // Holepunch testing fields - sharedBind *bind.SharedBind - holepunchTester *holepunch.HolepunchTester - holepunchTimeout time.Duration - holepunchEndpoints map[int]string // siteID -> endpoint for holepunch testing - holepunchStatus map[int]bool // siteID -> connected status - holepunchStopChan chan struct{} - holepunchUpdateChan chan struct{} + sharedBind *bind.SharedBind + holepunchTester *holepunch.HolepunchTester + holepunchTimeout time.Duration + holepunchEndpoints map[int]string // siteID -> endpoint for holepunch testing + holepunchStatus map[int]bool // siteID -> connected status + holepunchStopChan chan struct{} + holepunchUpdateChan chan struct{} // Relay tracking fields relayedPeers map[int]bool // siteID -> whether the peer is currently relayed @@ -456,10 +456,22 @@ func (pm *PeerMonitor) sendUnRelay(siteID int) error { } // CancelRelaySend stops the interval sender for the given chainId, if one exists. +// If chainId is empty, all active relay senders are stopped. func (pm *PeerMonitor) CancelRelaySend(chainId string) { pm.relaySendMu.Lock() defer pm.relaySendMu.Unlock() + if chainId == "" { + for id, stop := range pm.relaySends { + if stop != nil { + stop() + } + delete(pm.relaySends, id) + } + logger.Info("Cancelled all relay senders") + return + } + if stop, ok := pm.relaySends[chainId]; ok { stop() delete(pm.relaySends, chainId) @@ -567,7 +579,7 @@ func (pm *PeerMonitor) runHolepunchMonitor() { pm.holepunchCurrentInterval = pm.holepunchMinInterval currentInterval := pm.holepunchCurrentInterval pm.mutex.Unlock() - + timer.Reset(currentInterval) logger.Debug("Holepunch monitor interval updated, reset to %v", currentInterval) case <-timer.C: