mirror of
https://github.com/fosrl/olm.git
synced 2026-02-07 21:46:40 +00:00
@@ -221,6 +221,8 @@ func (pm *PeerManager) RemovePeer(siteId int) error {
|
||||
pm.peerMonitor.RemovePeer(siteId)
|
||||
logger.Info("Stopped monitoring for site %d", siteId)
|
||||
|
||||
pm.APIServer.RemovePeerStatus(siteId)
|
||||
|
||||
delete(pm.peers, siteId)
|
||||
return nil
|
||||
}
|
||||
@@ -360,10 +362,9 @@ func (pm *PeerManager) UpdatePeer(siteConfig SiteConfig) error {
|
||||
|
||||
pm.peerMonitor.UpdateHolepunchEndpoint(siteConfig.SiteId, siteConfig.Endpoint)
|
||||
|
||||
// Preserve the relay endpoint if the peer is relayed
|
||||
if pm.peerMonitor != nil && pm.peerMonitor.IsPeerRelayed(siteConfig.SiteId) && oldPeer.RelayEndpoint != "" {
|
||||
siteConfig.RelayEndpoint = oldPeer.RelayEndpoint
|
||||
}
|
||||
monitorAddress := strings.Split(siteConfig.ServerIP, "/")[0]
|
||||
monitorPeer := net.JoinHostPort(monitorAddress, strconv.Itoa(int(siteConfig.ServerPort+1))) // +1 for the monitor port
|
||||
pm.peerMonitor.UpdatePeerEndpoint(siteConfig.SiteId, monitorPeer) // +1 for monitor port
|
||||
|
||||
pm.peers[siteConfig.SiteId] = siteConfig
|
||||
return nil
|
||||
|
||||
@@ -188,6 +188,23 @@ func (pm *PeerMonitor) UpdateHolepunchEndpoint(siteID int, endpoint string) {
|
||||
pm.holepunchEndpoints[siteID] = endpoint
|
||||
}
|
||||
|
||||
// UpdatePeerEndpoint updates the monitor endpoint for a peer
|
||||
func (pm *PeerMonitor) UpdatePeerEndpoint(siteID int, monitorPeer string) {
|
||||
pm.mutex.Lock()
|
||||
defer pm.mutex.Unlock()
|
||||
|
||||
client, exists := pm.monitors[siteID]
|
||||
if !exists {
|
||||
logger.Warn("Cannot update endpoint: peer %d not found in monitor", siteID)
|
||||
return
|
||||
}
|
||||
|
||||
// Update the client's server address
|
||||
client.UpdateServerAddr(monitorPeer)
|
||||
|
||||
logger.Info("Updated monitor endpoint for site %d to %s", siteID, monitorPeer)
|
||||
}
|
||||
|
||||
// removePeerUnlocked stops monitoring a peer and removes it from the monitor
|
||||
// This function assumes the mutex is already held by the caller
|
||||
func (pm *PeerMonitor) removePeerUnlocked(siteID int) {
|
||||
@@ -417,6 +434,12 @@ func (pm *PeerMonitor) checkHolepunchEndpoints() {
|
||||
result := pm.holepunchTester.TestEndpoint(endpoint, timeout)
|
||||
|
||||
pm.mutex.Lock()
|
||||
// Check if peer was removed while we were testing
|
||||
if _, stillExists := pm.holepunchEndpoints[siteID]; !stillExists {
|
||||
pm.mutex.Unlock()
|
||||
continue // Peer was removed, skip processing
|
||||
}
|
||||
|
||||
previousStatus, exists := pm.holepunchStatus[siteID]
|
||||
pm.holepunchStatus[siteID] = result.Success
|
||||
isRelayed := pm.relayedPeers[siteID]
|
||||
|
||||
@@ -74,6 +74,20 @@ func (c *Client) SetMaxAttempts(attempts int) {
|
||||
c.maxAttempts = attempts
|
||||
}
|
||||
|
||||
// UpdateServerAddr updates the server address and resets the connection
|
||||
func (c *Client) UpdateServerAddr(serverAddr string) {
|
||||
c.connLock.Lock()
|
||||
defer c.connLock.Unlock()
|
||||
|
||||
// Close existing connection if any
|
||||
if c.conn != nil {
|
||||
c.conn.Close()
|
||||
c.conn = nil
|
||||
}
|
||||
|
||||
c.serverAddr = serverAddr
|
||||
}
|
||||
|
||||
// Close cleans up client resources
|
||||
func (c *Client) Close() {
|
||||
c.StopMonitor()
|
||||
@@ -143,14 +157,14 @@ func (c *Client) TestConnection(ctx context.Context) (bool, time.Duration) {
|
||||
return false, 0
|
||||
}
|
||||
|
||||
// logger.Debug("Attempting to send monitor packet to %s", c.serverAddr)
|
||||
logger.Debug("Attempting to send monitor packet to %s", c.serverAddr)
|
||||
_, err := c.conn.Write(packet)
|
||||
if err != nil {
|
||||
c.connLock.Unlock()
|
||||
logger.Info("Error sending packet: %v", err)
|
||||
continue
|
||||
}
|
||||
// logger.Debug("Successfully sent monitor packet")
|
||||
logger.Debug("Successfully sent monitor packet")
|
||||
|
||||
// Set read deadline
|
||||
c.conn.SetReadDeadline(time.Now().Add(c.timeout))
|
||||
|
||||
Reference in New Issue
Block a user