mirror of
https://github.com/fosrl/olm.git
synced 2026-02-07 21:46:40 +00:00
Fixing small things
This commit is contained in:
@@ -190,6 +190,12 @@ func (s *API) UpdatePeerStatus(siteID int, connected bool, rtt time.Duration, en
|
||||
status.IsRelay = isRelay
|
||||
}
|
||||
|
||||
func (s *API) RemovePeerStatus(siteID int) { // remove the peer from the status map
|
||||
s.statusMu.Lock()
|
||||
defer s.statusMu.Unlock()
|
||||
delete(s.peerStatuses, siteID)
|
||||
}
|
||||
|
||||
// SetConnectionStatus sets the overall connection status
|
||||
func (s *API) SetConnectionStatus(isConnected bool) {
|
||||
s.statusMu.Lock()
|
||||
|
||||
15
main.go
15
main.go
@@ -210,13 +210,14 @@ func runOlmMainWithArgs(ctx context.Context, cancel context.CancelFunc, signalCt
|
||||
|
||||
// Create a new olm.Config struct and copy values from the main config
|
||||
olmConfig := olm.GlobalConfig{
|
||||
LogLevel: config.LogLevel,
|
||||
EnableAPI: config.EnableAPI,
|
||||
HTTPAddr: config.HTTPAddr,
|
||||
SocketPath: config.SocketPath,
|
||||
Version: config.Version,
|
||||
Agent: "olm-cli",
|
||||
OnExit: cancel, // Pass cancel function directly to trigger shutdown
|
||||
LogLevel: config.LogLevel,
|
||||
EnableAPI: config.EnableAPI,
|
||||
HTTPAddr: config.HTTPAddr,
|
||||
SocketPath: config.SocketPath,
|
||||
Version: config.Version,
|
||||
Agent: "olm-cli",
|
||||
OnExit: cancel, // Pass cancel function directly to trigger shutdown
|
||||
OnTerminated: cancel,
|
||||
}
|
||||
|
||||
olm.Init(ctx, olmConfig)
|
||||
|
||||
19
olm/olm.go
19
olm/olm.go
@@ -577,6 +577,11 @@ func StartTunnel(config TunnelConfig) {
|
||||
return
|
||||
}
|
||||
|
||||
if _, exists := peerManager.GetPeer(addSubnetsData.SiteId); !exists {
|
||||
logger.Debug("Peer %d not found for removing remote subnets and aliases", addSubnetsData.SiteId)
|
||||
return
|
||||
}
|
||||
|
||||
// Add new subnets
|
||||
for _, subnet := range addSubnetsData.RemoteSubnets {
|
||||
if err := peerManager.AddRemoteSubnet(addSubnetsData.SiteId, subnet); err != nil {
|
||||
@@ -608,6 +613,11 @@ func StartTunnel(config TunnelConfig) {
|
||||
return
|
||||
}
|
||||
|
||||
if _, exists := peerManager.GetPeer(removeSubnetsData.SiteId); !exists {
|
||||
logger.Debug("Peer %d not found for removing remote subnets and aliases", removeSubnetsData.SiteId)
|
||||
return
|
||||
}
|
||||
|
||||
// Remove subnets
|
||||
for _, subnet := range removeSubnetsData.RemoteSubnets {
|
||||
if err := peerManager.RemoveRemoteSubnet(removeSubnetsData.SiteId, subnet); err != nil {
|
||||
@@ -639,6 +649,11 @@ func StartTunnel(config TunnelConfig) {
|
||||
return
|
||||
}
|
||||
|
||||
if _, exists := peerManager.GetPeer(updateSubnetsData.SiteId); !exists {
|
||||
logger.Debug("Peer %d not found for removing remote subnets and aliases", updateSubnetsData.SiteId)
|
||||
return
|
||||
}
|
||||
|
||||
// Remove old subnets
|
||||
for _, subnet := range updateSubnetsData.OldRemoteSubnets {
|
||||
if err := peerManager.RemoveRemoteSubnet(updateSubnetsData.SiteId, subnet); err != nil {
|
||||
@@ -801,6 +816,10 @@ func StartTunnel(config TunnelConfig) {
|
||||
}
|
||||
})
|
||||
|
||||
olm.RegisterHandler("pong", func(msg websocket.WSMessage) {
|
||||
logger.Debug("Received pong message")
|
||||
})
|
||||
|
||||
olm.OnConnect(func() error {
|
||||
logger.Info("Websocket Connected")
|
||||
|
||||
|
||||
@@ -221,6 +221,8 @@ func (pm *PeerManager) RemovePeer(siteId int) error {
|
||||
pm.peerMonitor.RemovePeer(siteId)
|
||||
logger.Info("Stopped monitoring for site %d", siteId)
|
||||
|
||||
pm.APIServer.RemovePeerStatus(siteId)
|
||||
|
||||
delete(pm.peers, siteId)
|
||||
return nil
|
||||
}
|
||||
@@ -360,10 +362,9 @@ func (pm *PeerManager) UpdatePeer(siteConfig SiteConfig) error {
|
||||
|
||||
pm.peerMonitor.UpdateHolepunchEndpoint(siteConfig.SiteId, siteConfig.Endpoint)
|
||||
|
||||
// Preserve the relay endpoint if the peer is relayed
|
||||
if pm.peerMonitor != nil && pm.peerMonitor.IsPeerRelayed(siteConfig.SiteId) && oldPeer.RelayEndpoint != "" {
|
||||
siteConfig.RelayEndpoint = oldPeer.RelayEndpoint
|
||||
}
|
||||
monitorAddress := strings.Split(siteConfig.ServerIP, "/")[0]
|
||||
monitorPeer := net.JoinHostPort(monitorAddress, strconv.Itoa(int(siteConfig.ServerPort+1))) // +1 for the monitor port
|
||||
pm.peerMonitor.UpdatePeerEndpoint(siteConfig.SiteId, monitorPeer) // +1 for monitor port
|
||||
|
||||
pm.peers[siteConfig.SiteId] = siteConfig
|
||||
return nil
|
||||
|
||||
@@ -188,6 +188,23 @@ func (pm *PeerMonitor) UpdateHolepunchEndpoint(siteID int, endpoint string) {
|
||||
pm.holepunchEndpoints[siteID] = endpoint
|
||||
}
|
||||
|
||||
// UpdatePeerEndpoint updates the monitor endpoint for a peer
|
||||
func (pm *PeerMonitor) UpdatePeerEndpoint(siteID int, monitorPeer string) {
|
||||
pm.mutex.Lock()
|
||||
defer pm.mutex.Unlock()
|
||||
|
||||
client, exists := pm.monitors[siteID]
|
||||
if !exists {
|
||||
logger.Warn("Cannot update endpoint: peer %d not found in monitor", siteID)
|
||||
return
|
||||
}
|
||||
|
||||
// Update the client's server address
|
||||
client.UpdateServerAddr(monitorPeer)
|
||||
|
||||
logger.Info("Updated monitor endpoint for site %d to %s", siteID, monitorPeer)
|
||||
}
|
||||
|
||||
// removePeerUnlocked stops monitoring a peer and removes it from the monitor
|
||||
// This function assumes the mutex is already held by the caller
|
||||
func (pm *PeerMonitor) removePeerUnlocked(siteID int) {
|
||||
@@ -417,6 +434,12 @@ func (pm *PeerMonitor) checkHolepunchEndpoints() {
|
||||
result := pm.holepunchTester.TestEndpoint(endpoint, timeout)
|
||||
|
||||
pm.mutex.Lock()
|
||||
// Check if peer was removed while we were testing
|
||||
if _, stillExists := pm.holepunchEndpoints[siteID]; !stillExists {
|
||||
pm.mutex.Unlock()
|
||||
continue // Peer was removed, skip processing
|
||||
}
|
||||
|
||||
previousStatus, exists := pm.holepunchStatus[siteID]
|
||||
pm.holepunchStatus[siteID] = result.Success
|
||||
isRelayed := pm.relayedPeers[siteID]
|
||||
|
||||
@@ -74,6 +74,20 @@ func (c *Client) SetMaxAttempts(attempts int) {
|
||||
c.maxAttempts = attempts
|
||||
}
|
||||
|
||||
// UpdateServerAddr updates the server address and resets the connection
|
||||
func (c *Client) UpdateServerAddr(serverAddr string) {
|
||||
c.connLock.Lock()
|
||||
defer c.connLock.Unlock()
|
||||
|
||||
// Close existing connection if any
|
||||
if c.conn != nil {
|
||||
c.conn.Close()
|
||||
c.conn = nil
|
||||
}
|
||||
|
||||
c.serverAddr = serverAddr
|
||||
}
|
||||
|
||||
// Close cleans up client resources
|
||||
func (c *Client) Close() {
|
||||
c.StopMonitor()
|
||||
@@ -143,14 +157,14 @@ func (c *Client) TestConnection(ctx context.Context) (bool, time.Duration) {
|
||||
return false, 0
|
||||
}
|
||||
|
||||
// logger.Debug("Attempting to send monitor packet to %s", c.serverAddr)
|
||||
logger.Debug("Attempting to send monitor packet to %s", c.serverAddr)
|
||||
_, err := c.conn.Write(packet)
|
||||
if err != nil {
|
||||
c.connLock.Unlock()
|
||||
logger.Info("Error sending packet: %v", err)
|
||||
continue
|
||||
}
|
||||
// logger.Debug("Successfully sent monitor packet")
|
||||
logger.Debug("Successfully sent monitor packet")
|
||||
|
||||
// Set read deadline
|
||||
c.conn.SetReadDeadline(time.Now().Add(c.timeout))
|
||||
|
||||
@@ -646,9 +646,7 @@ func (c *Client) readPumpWithDisconnectDetection() {
|
||||
|
||||
c.handlersMux.RLock()
|
||||
if handler, ok := c.handlers[msg.Type]; ok {
|
||||
logger.Debug("***********************************Running handler for message type: %s", msg.Type)
|
||||
handler(msg)
|
||||
logger.Debug("***********************************Finished handler for message type: %s", msg.Type)
|
||||
}
|
||||
c.handlersMux.RUnlock()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user