From b76259bc31aba4782c5de8df2ae699f6e5c2587a Mon Sep 17 00:00:00 2001 From: Owen Date: Wed, 24 Dec 2025 10:06:25 -0500 Subject: [PATCH] Add sync message Former-commit-id: d01f180941c6c854f73274c86c281260bd653875 --- go.sum | 3 -- olm/olm.go | 147 ++++++++++++++++++++++++++++++++++++++++++++++++++++ olm/util.go | 43 +++++++++++++++ 3 files changed, 190 insertions(+), 3 deletions(-) diff --git a/go.sum b/go.sum index 7e94e2a..9bf88e2 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,7 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= -<<<<<<< HEAD -======= github.com/fosrl/newt v0.0.0-20251222020104-a21a8e90fa01 h1:VpuI42l4enih//6IFFQDln/B7WukfMePxIRIpXsNe/0= github.com/fosrl/newt v0.0.0-20251222020104-a21a8e90fa01/go.mod h1:pol958CEs0nQmo/35Ltv0CGksheIKCS2hoNvdTVLEcI= ->>>>>>> dev github.com/godbus/dbus/v5 v5.2.0 h1:3WexO+U+yg9T70v9FdHr9kCxYlazaAXUhx2VMkbfax8= github.com/godbus/dbus/v5 v5.2.0/go.mod h1:3AAv2+hPq5rdnr5txxxRwiGjPXamgoIHgz9FPBfOp3c= github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg= diff --git a/olm/olm.go b/olm/olm.go index f84ee4f..4cbb391 100644 --- a/olm/olm.go +++ b/olm/olm.go @@ -453,6 +453,153 @@ func StartTunnel(config TunnelConfig) { logger.Info("WireGuard device created.") }) + // Handler for syncing peer configuration - reconciles expected state with actual state + olm.RegisterHandler("olm/sync", func(msg websocket.WSMessage) { + logger.Debug("Received sync message: %v", msg.Data) + + if !connected { + logger.Warn("Not connected, ignoring sync request") + return + } + + if peerManager == nil { + logger.Warn("Peer manager not initialized, ignoring sync request") + return + } + + jsonData, err := json.Marshal(msg.Data) + if err != nil { + logger.Error("Error marshaling sync data: %v", err) + return + } + + var wgData WgData + if err := json.Unmarshal(jsonData, &wgData); err != nil { + logger.Error("Error unmarshaling sync data: %v", err) + return + } + + // Build a map of expected peers from the incoming data + expectedPeers := make(map[int]peers.SiteConfig) + for _, site := range wgData.Sites { + expectedPeers[site.SiteId] = site + } + + // Get all current peers + currentPeers := peerManager.GetAllPeers() + currentPeerMap := make(map[int]peers.SiteConfig) + for _, peer := range currentPeers { + currentPeerMap[peer.SiteId] = peer + } + + // Find peers to remove (in current but not in expected) + for siteId := range currentPeerMap { + if _, exists := expectedPeers[siteId]; !exists { + logger.Info("Sync: Removing peer for site %d (no longer in expected config)", siteId) + if err := peerManager.RemovePeer(siteId); err != nil { + logger.Error("Sync: Failed to remove peer %d: %v", siteId, err) + } else { + // Remove any exit nodes associated with this peer from hole punching + if holePunchManager != nil { + removed := holePunchManager.RemoveExitNodesByPeer(siteId) + if removed > 0 { + logger.Info("Sync: Removed %d exit nodes associated with peer %d from hole punch rotation", removed, siteId) + } + } + } + } + } + + // Find peers to add (in expected but not in current) and peers to update + for siteId, expectedSite := range expectedPeers { + if _, exists := currentPeerMap[siteId]; !exists { + // New peer - add it using the add flow (with holepunch) + logger.Info("Sync: Adding new peer for site %d", siteId) + + // Trigger immediate hole punch attempt so that if the peer decides to relay we have already punched close to when we need it + holePunchManager.TriggerHolePunch() + + // TODO: do we need to send the message to the cloud to add the peer that way? + if err := peerManager.AddPeer(expectedSite); err != nil { + logger.Error("Sync: Failed to add peer %d: %v", siteId, err) + } else { + logger.Info("Sync: Successfully added peer for site %d", siteId) + } + } else { + // Existing peer - check if update is needed + currentSite := currentPeerMap[siteId] + needsUpdate := false + + // Check if any fields have changed + if expectedSite.Endpoint != "" && expectedSite.Endpoint != currentSite.Endpoint { + needsUpdate = true + } + if expectedSite.RelayEndpoint != "" && expectedSite.RelayEndpoint != currentSite.RelayEndpoint { + needsUpdate = true + } + if expectedSite.PublicKey != "" && expectedSite.PublicKey != currentSite.PublicKey { + needsUpdate = true + } + if expectedSite.ServerIP != "" && expectedSite.ServerIP != currentSite.ServerIP { + needsUpdate = true + } + if expectedSite.ServerPort != 0 && expectedSite.ServerPort != currentSite.ServerPort { + needsUpdate = true + } + // Check remote subnets + if expectedSite.RemoteSubnets != nil && !slicesEqual(expectedSite.RemoteSubnets, currentSite.RemoteSubnets) { + needsUpdate = true + } + // Check aliases + if expectedSite.Aliases != nil && !aliasesEqual(expectedSite.Aliases, currentSite.Aliases) { + needsUpdate = true + } + + if needsUpdate { + logger.Info("Sync: Updating peer for site %d", siteId) + + // Merge expected data with current data + siteConfig := currentSite + if expectedSite.Endpoint != "" { + siteConfig.Endpoint = expectedSite.Endpoint + } + if expectedSite.RelayEndpoint != "" { + siteConfig.RelayEndpoint = expectedSite.RelayEndpoint + } + if expectedSite.PublicKey != "" { + siteConfig.PublicKey = expectedSite.PublicKey + } + if expectedSite.ServerIP != "" { + siteConfig.ServerIP = expectedSite.ServerIP + } + if expectedSite.ServerPort != 0 { + siteConfig.ServerPort = expectedSite.ServerPort + } + if expectedSite.RemoteSubnets != nil { + siteConfig.RemoteSubnets = expectedSite.RemoteSubnets + } + if expectedSite.Aliases != nil { + siteConfig.Aliases = expectedSite.Aliases + } + + if err := peerManager.UpdatePeer(siteConfig); err != nil { + logger.Error("Sync: Failed to update peer %d: %v", siteId, err) + } else { + // If the endpoint changed, trigger holepunch to refresh NAT mappings + if expectedSite.Endpoint != "" && expectedSite.Endpoint != currentSite.Endpoint { + logger.Info("Sync: Endpoint changed for site %d, triggering holepunch to refresh NAT mappings", siteId) + holePunchManager.TriggerHolePunch() + holePunchManager.ResetInterval() + } + logger.Info("Sync: Successfully updated peer for site %d", siteId) + } + } + } + } + + logger.Info("Sync completed: processed %d expected peers, had %d current peers", len(expectedPeers), len(currentPeers)) + }) + olm.RegisterHandler("olm/wg/peer/update", func(msg websocket.WSMessage) { logger.Debug("Received update-peer message: %v", msg.Data) diff --git a/olm/util.go b/olm/util.go index 6bfd171..d138755 100644 --- a/olm/util.go +++ b/olm/util.go @@ -5,6 +5,7 @@ import ( "github.com/fosrl/newt/logger" "github.com/fosrl/newt/network" + "github.com/fosrl/olm/peers" "github.com/fosrl/olm/websocket" ) @@ -53,3 +54,45 @@ func GetNetworkSettingsJSON() (string, error) { func GetNetworkSettingsIncrementor() int { return network.GetIncrementor() } + +// slicesEqual compares two string slices for equality (order-independent) +func slicesEqual(a, b []string) bool { + if len(a) != len(b) { + return false + } + // Create a map to count occurrences in slice a + counts := make(map[string]int) + for _, v := range a { + counts[v]++ + } + // Check if slice b has the same elements + for _, v := range b { + counts[v]-- + if counts[v] < 0 { + return false + } + } + return true +} + +// aliasesEqual compares two Alias slices for equality (order-independent) +func aliasesEqual(a, b []peers.Alias) bool { + if len(a) != len(b) { + return false + } + // Create a map to count occurrences in slice a (using alias+address as key) + counts := make(map[string]int) + for _, v := range a { + key := v.Alias + "|" + v.AliasAddress + counts[key]++ + } + // Check if slice b has the same elements + for _, v := range b { + key := v.Alias + "|" + v.AliasAddress + counts[key]-- + if counts[key] < 0 { + return false + } + } + return true +}