diff --git a/management/internals/controllers/network_map/controller/controller.go b/management/internals/controllers/network_map/controller/controller.go index df16e1922..7f0f9bd4b 100644 --- a/management/internals/controllers/network_map/controller/controller.go +++ b/management/internals/controllers/network_map/controller/controller.go @@ -447,7 +447,9 @@ func (c *Controller) GetValidatedPeerWithMap(ctx context.Context, isRequiresAppr if c.experimentalNetworkMap(accountID) { networkMap = c.getPeerNetworkMapExp(ctx, peer.AccountID, peer.ID, approvedPeersMap, customZone, c.accountManagerMetrics) } else { - networkMap = account.GetPeerNetworkMap(ctx, peer.ID, customZone, approvedPeersMap, account.GetResourcePoliciesMap(), account.GetResourceRoutersMap(), c.accountManagerMetrics, account.GetActiveGroupUsers()) + resourcePolicies := account.GetResourcePoliciesMap() + routers := account.GetResourceRoutersMap() + networkMap = account.GetPeerNetworkMap(ctx, peer.ID, customZone, approvedPeersMap, resourcePolicies, routers, c.accountManagerMetrics, account.GetActiveGroupUsers()) } proxyNetworkMap, ok := proxyNetworkMaps[peer.ID] @@ -480,12 +482,13 @@ func (c *Controller) getPeerNetworkMapExp( Network: &types.Network{}, } } + return account.GetPeerNetworkMapExp(ctx, peerId, customZone, validatedPeers, metrics) } -func (c *Controller) onPeerAddedUpdNetworkMapCache(account *types.Account, peerId string) error { +func (c *Controller) onPeersAddedUpdNetworkMapCache(account *types.Account, peerIds ...string) { c.enrichAccountFromHolder(account) - return account.OnPeerAddedUpdNetworkMapCache(peerId) + account.OnPeersAddedUpdNetworkMapCache(peerIds...) } func (c *Controller) onPeerDeletedUpdNetworkMapCache(account *types.Account, peerId string) error { @@ -537,7 +540,6 @@ func (c *Controller) enrichAccountFromHolder(account *types.Account) { if account.NetworkMapCache == nil { return } - account.NetworkMapCache.UpdateAccountPointer(account) c.holder.AddAccount(account) } @@ -715,18 +717,14 @@ func (c *Controller) OnPeersUpdated(ctx context.Context, accountID string, peerI } func (c *Controller) OnPeersAdded(ctx context.Context, accountID string, peerIDs []string) error { - for _, peerID := range peerIDs { - if c.experimentalNetworkMap(accountID) { - account, err := c.requestBuffer.GetAccountWithBackpressure(ctx, accountID) - if err != nil { - return err - } - - err = c.onPeerAddedUpdNetworkMapCache(account, peerID) - if err != nil { - return err - } + log.WithContext(ctx).Debugf("OnPeersAdded call to add peers: %v", peerIDs) + if c.experimentalNetworkMap(accountID) { + account, err := c.requestBuffer.GetAccountWithBackpressure(ctx, accountID) + if err != nil { + return err } + log.WithContext(ctx).Debugf("peers are ready to be added to networkmap cache: %v", peerIDs) + c.onPeersAddedUpdNetworkMapCache(account, peerIDs...) } return c.bufferSendUpdateAccountPeers(ctx, accountID) } @@ -813,7 +811,9 @@ func (c *Controller) GetNetworkMap(ctx context.Context, peerID string) (*types.N if c.experimentalNetworkMap(peer.AccountID) { networkMap = c.getPeerNetworkMapExp(ctx, peer.AccountID, peerID, validatedPeers, customZone, nil) } else { - networkMap = account.GetPeerNetworkMap(ctx, peer.ID, customZone, validatedPeers, account.GetResourcePoliciesMap(), account.GetResourceRoutersMap(), nil, account.GetActiveGroupUsers()) + resourcePolicies := account.GetResourcePoliciesMap() + routers := account.GetResourceRoutersMap() + networkMap = account.GetPeerNetworkMap(ctx, peer.ID, customZone, validatedPeers, resourcePolicies, routers, nil, account.GetActiveGroupUsers()) } proxyNetworkMap, ok := proxyNetworkMaps[peer.ID] diff --git a/management/server/types/holder.go b/management/server/types/holder.go index 3996db2b6..ad7d07522 100644 --- a/management/server/types/holder.go +++ b/management/server/types/holder.go @@ -25,6 +25,10 @@ func (h *Holder) GetAccount(id string) *Account { func (h *Holder) AddAccount(account *Account) { h.mu.Lock() defer h.mu.Unlock() + a := h.accounts[account.Id] + if a != nil && a.Network.CurrentSerial() >= account.Network.CurrentSerial() { + return + } h.accounts[account.Id] = account } diff --git a/management/server/types/networkmap.go b/management/server/types/networkmap.go index c1099726f..ff81e5dc1 100644 --- a/management/server/types/networkmap.go +++ b/management/server/types/networkmap.go @@ -36,14 +36,21 @@ func (a *Account) OnPeerAddedUpdNetworkMapCache(peerId string) error { if a.NetworkMapCache == nil { return nil } - return a.NetworkMapCache.OnPeerAddedIncremental(peerId) + return a.NetworkMapCache.OnPeerAddedIncremental(a, peerId) +} + +func (a *Account) OnPeersAddedUpdNetworkMapCache(peerIds ...string) { + if a.NetworkMapCache == nil { + return + } + a.NetworkMapCache.EnqueuePeersForIncrementalAdd(a, peerIds...) } func (a *Account) OnPeerDeletedUpdNetworkMapCache(peerId string) error { if a.NetworkMapCache == nil { return nil } - return a.NetworkMapCache.OnPeerDeleted(peerId) + return a.NetworkMapCache.OnPeerDeleted(a, peerId) } func (a *Account) UpdatePeerInNetworkMapCache(peer *nbpeer.Peer) { diff --git a/management/server/types/networkmap_golden_test.go b/management/server/types/networkmap_golden_test.go index 913094e4c..9135024d2 100644 --- a/management/server/types/networkmap_golden_test.go +++ b/management/server/types/networkmap_golden_test.go @@ -266,7 +266,7 @@ func TestGetPeerNetworkMap_Golden_New_WithOnPeerAdded(t *testing.T) { account.Network.Serial++ } - err := builder.OnPeerAddedIncremental(newPeerID) + err := builder.OnPeerAddedIncremental(account, newPeerID) require.NoError(t, err, "error adding peer to cache") networkMap := builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil) @@ -328,7 +328,7 @@ func BenchmarkGetPeerNetworkMap_AfterPeerAdded(b *testing.B) { b.ResetTimer() b.Run("new builder after add", func(b *testing.B) { for i := 0; i < b.N; i++ { - _ = builder.OnPeerAddedIncremental(newPeerID) + _ = builder.OnPeerAddedIncremental(account, newPeerID) for _, testingPeerID := range peerIDs { _ = builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil) } @@ -473,7 +473,7 @@ func TestGetPeerNetworkMap_Golden_New_WithOnPeerAddedRouter(t *testing.T) { account.Network.Serial++ } - err := builder.OnPeerAddedIncremental(newRouterID) + err := builder.OnPeerAddedIncremental(account, newRouterID) require.NoError(t, err, "error adding router to cache") networkMap := builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil) @@ -558,7 +558,7 @@ func BenchmarkGetPeerNetworkMap_AfterRouterPeerAdded(b *testing.B) { b.ResetTimer() b.Run("new builder after add", func(b *testing.B) { for i := 0; i < b.N; i++ { - _ = builder.OnPeerAddedIncremental(newRouterID) + _ = builder.OnPeerAddedIncremental(account, newRouterID) for _, testingPeerID := range peerIDs { _ = builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil) } @@ -662,7 +662,7 @@ func TestGetPeerNetworkMap_Golden_New_WithOnPeerDeleted(t *testing.T) { account.Network.Serial++ } - err := builder.OnPeerDeleted(deletedPeerID) + err := builder.OnPeerDeleted(account, deletedPeerID) require.NoError(t, err, "error deleting peer from cache") networkMap := builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil) @@ -794,7 +794,7 @@ func TestGetPeerNetworkMap_Golden_New_WithDeletedRouterPeer(t *testing.T) { account.Network.Serial++ } - err := builder.OnPeerDeleted(deletedRouterID) + err := builder.OnPeerDeleted(account, deletedRouterID) require.NoError(t, err, "error deleting routing peer from cache") networkMap := builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil) @@ -855,7 +855,7 @@ func BenchmarkGetPeerNetworkMap_AfterPeerDeleted(b *testing.B) { b.ResetTimer() b.Run("new builder after delete", func(b *testing.B) { for i := 0; i < b.N; i++ { - _ = builder.OnPeerDeleted(deletedPeerID) + _ = builder.OnPeerDeleted(account, deletedPeerID) for _, testingPeerID := range peerIDs { _ = builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil) } @@ -1067,3 +1067,85 @@ func createTestAccountWithEntities() *types.Account { return account } + +func TestGetPeerNetworkMap_Golden_New_WithOnPeerAddedRouter_Batched(t *testing.T) { + account := createTestAccountWithEntities() + + ctx := context.Background() + validatedPeersMap := make(map[string]struct{}) + for i := range numPeers { + peerID := fmt.Sprintf("peer-%d", i) + if peerID == offlinePeerID { + continue + } + validatedPeersMap[peerID] = struct{}{} + } + + builder := types.NewNetworkMapBuilder(account, validatedPeersMap) + + newRouterID := "peer-new-router-102" + newRouterIP := net.IP{100, 64, 1, 2} + newRouter := &nbpeer.Peer{ + ID: newRouterID, + IP: newRouterIP, + Key: fmt.Sprintf("key-%s", newRouterID), + DNSLabel: "newrouter102", + Status: &nbpeer.PeerStatus{Connected: true, LastSeen: time.Now()}, + UserID: "user-admin", + Meta: nbpeer.PeerSystemMeta{WtVersion: "0.26.0", GoOS: "linux"}, + LastLogin: func() *time.Time { t := time.Now(); return &t }(), + } + + account.Peers[newRouterID] = newRouter + + if opsGroup, exists := account.Groups[opsGroupID]; exists { + opsGroup.Peers = append(opsGroup.Peers, newRouterID) + } + if allGroup, exists := account.Groups[allGroupID]; exists { + allGroup.Peers = append(allGroup.Peers, newRouterID) + } + + newRoute := &route.Route{ + ID: route.ID("route-new-router"), + Network: netip.MustParsePrefix("172.16.0.0/24"), + Peer: newRouter.Key, + PeerID: newRouterID, + Description: "Route from new router", + Enabled: true, + PeerGroups: []string{opsGroupID}, + Groups: []string{devGroupID, opsGroupID}, + AccessControlGroups: []string{devGroupID}, + AccountID: account.Id, + } + account.Routes[newRoute.ID] = newRoute + + validatedPeersMap[newRouterID] = struct{}{} + + if account.Network != nil { + account.Network.Serial++ + } + + builder.EnqueuePeersForIncrementalAdd(account, newRouterID) + + time.Sleep(100 * time.Millisecond) + + networkMap := builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil) + + normalizeAndSortNetworkMap(networkMap) + + jsonData, err := json.MarshalIndent(networkMap, "", " ") + require.NoError(t, err, "error marshaling network map to JSON") + + goldenFilePath := filepath.Join("testdata", "networkmap_golden_new_with_onpeeradded_router.json") + + t.Log("Update golden file with OnPeerAdded router...") + err = os.MkdirAll(filepath.Dir(goldenFilePath), 0755) + require.NoError(t, err) + err = os.WriteFile(goldenFilePath, jsonData, 0644) + require.NoError(t, err) + + expectedJSON, err := os.ReadFile(goldenFilePath) + require.NoError(t, err, "error reading golden file") + + require.JSONEq(t, string(expectedJSON), string(jsonData), "network map from NEW builder with OnPeerAdded router does not match golden file") +} diff --git a/management/server/types/networkmapbuilder.go b/management/server/types/networkmapbuilder.go index 5790f1646..a508cf725 100644 --- a/management/server/types/networkmapbuilder.go +++ b/management/server/types/networkmapbuilder.go @@ -7,7 +7,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" log "github.com/sirupsen/logrus" @@ -27,6 +26,9 @@ const ( v6AllWildcard = "::/0" fw = "fw:" rfw = "route-fw:" + + szAddPeerBatch = 10 + maxPeerAddRetries = 20 ) type NetworkMapCache struct { @@ -75,9 +77,19 @@ type PeerRoutesView struct { } type NetworkMapBuilder struct { - account atomic.Pointer[Account] + account *Account cache *NetworkMapCache validatedPeers map[string]struct{} + + apb addPeerBatch +} + +type addPeerBatch struct { + mu sync.Mutex + sg *sync.Cond + ids []string + la *Account + retryCount map[string]int } func NewNetworkMapBuilder(account *Account, validatedPeers map[string]struct{}) *NetworkMapBuilder { @@ -102,11 +114,16 @@ func NewNetworkMapBuilder(account *Account, validatedPeers map[string]struct{}) }, validatedPeers: make(map[string]struct{}), } - builder.account.Store(account) + builder.apb.sg = sync.NewCond(&builder.apb.mu) + builder.apb.ids = make([]string, 0, szAddPeerBatch) + builder.apb.la = account + builder.apb.retryCount = make(map[string]int) + maps.Copy(builder.validatedPeers, validatedPeers) builder.initialBuild(account) + go builder.incAddPeerLoop() return builder } @@ -114,6 +131,8 @@ func (b *NetworkMapBuilder) initialBuild(account *Account) { b.cache.mu.Lock() defer b.cache.mu.Unlock() + b.account = account + start := time.Now() b.buildGlobalIndexes(account) @@ -259,6 +278,7 @@ func (b *NetworkMapBuilder) getPeerConnectionResources(account *Account, peer *n validatedPeersMap map[string]struct{}, ) ([]*nbpeer.Peer, []*FirewallRule) { peerID := peer.ID + ctx := context.Background() peerGroups := b.cache.peerToGroups[peerID] peerGroupsMap := make(map[string]struct{}, len(peerGroups)) @@ -274,6 +294,9 @@ func (b *NetworkMapBuilder) getPeerConnectionResources(account *Account, peer *n for _, group := range peerGroups { policies := b.cache.groupToPolicies[group] for _, policy := range policies { + if isValid := account.validatePostureChecksOnPeer(ctx, policy.SourcePostureChecks, peerID); !isValid { + continue + } rules := b.cache.policyToRules[policy.ID] for _, rule := range rules { var sourcePeers, destinationPeers []*nbpeer.Peer @@ -316,13 +339,13 @@ func (b *NetworkMapBuilder) getPeerConnectionResources(account *Account, peer *n if rule.Bidirectional { if peerInSources { b.generateResourcescached( - account, rule, destinationPeers, FirewallRuleDirectionIN, + rule, destinationPeers, FirewallRuleDirectionIN, peer, &peers, &fwRules, peersExists, rulesExists, ) } if peerInDestinations { b.generateResourcescached( - account, rule, sourcePeers, FirewallRuleDirectionOUT, + rule, sourcePeers, FirewallRuleDirectionOUT, peer, &peers, &fwRules, peersExists, rulesExists, ) } @@ -330,14 +353,14 @@ func (b *NetworkMapBuilder) getPeerConnectionResources(account *Account, peer *n if peerInSources { b.generateResourcescached( - account, rule, destinationPeers, FirewallRuleDirectionOUT, + rule, destinationPeers, FirewallRuleDirectionOUT, peer, &peers, &fwRules, peersExists, rulesExists, ) } if peerInDestinations { b.generateResourcescached( - account, rule, sourcePeers, FirewallRuleDirectionIN, + rule, sourcePeers, FirewallRuleDirectionIN, peer, &peers, &fwRules, peersExists, rulesExists, ) } @@ -398,14 +421,9 @@ func (b *NetworkMapBuilder) getPeersFromGroupscached(account *Account, groupIDs } func (b *NetworkMapBuilder) generateResourcescached( - account *Account, rule *PolicyRule, groupPeers []*nbpeer.Peer, direction int, targetPeer *nbpeer.Peer, + rule *PolicyRule, groupPeers []*nbpeer.Peer, direction int, targetPeer *nbpeer.Peer, peers *[]*nbpeer.Peer, rules *[]*FirewallRule, peersExists map[string]struct{}, rulesExists map[string]struct{}, ) { - isAll := false - if allGroup, err := account.GetGroupAll(); err == nil { - isAll = (len(allGroup.Peers) - 1) == len(groupPeers) - } - for _, peer := range groupPeers { if peer == nil { continue @@ -423,10 +441,6 @@ func (b *NetworkMapBuilder) generateResourcescached( Protocol: string(rule.Protocol), } - if isAll { - fr.PeerIP = allPeers - } - var s strings.Builder s.WriteString(rule.ID) s.WriteString(fr.PeerIP) @@ -931,8 +945,12 @@ func (b *NetworkMapBuilder) getPeerNSGroups(account *Account, peerID string, che return peerNSGroups } -func (b *NetworkMapBuilder) UpdateAccountPointer(account *Account) { - b.account.Store(account) +// lock should be held +func (b *NetworkMapBuilder) updateAccountLocked(account *Account) *Account { + if account.Network.CurrentSerial() > b.account.Network.CurrentSerial() { + b.account = account + } + return b.account } func (b *NetworkMapBuilder) GetPeerNetworkMap( @@ -940,16 +958,17 @@ func (b *NetworkMapBuilder) GetPeerNetworkMap( validatedPeers map[string]struct{}, metrics *telemetry.AccountManagerMetrics, ) *NetworkMap { start := time.Now() - account := b.account.Load() + + b.cache.mu.RLock() + defer b.cache.mu.RUnlock() + + account := b.account peer := account.GetPeer(peerID) if peer == nil { return &NetworkMap{Network: account.Network.Copy()} } - b.cache.mu.RLock() - defer b.cache.mu.RUnlock() - aclView := b.cache.peerACLs[peerID] routesView := b.cache.peerRoutes[peerID] dnsConfig := b.cache.peerDNS[peerID] @@ -1013,6 +1032,8 @@ func (b *NetworkMapBuilder) assembleNetworkMap( for _, ruleID := range aclView.FirewallRuleIDs { if rule := b.cache.globalRules[ruleID]; rule != nil { firewallRules = append(firewallRules, rule) + } else { + log.Debugf("NetworkMapBuilder: peer %s assembling network map has no fwrule %s in globalRules", peer.ID, ruleID) } } @@ -1119,6 +1140,106 @@ func (b *NetworkMapBuilder) isPeerRouter(account *Account, peerID string) bool { return false } +func (b *NetworkMapBuilder) incAddPeerLoop() { + for { + b.apb.mu.Lock() + if len(b.apb.ids) == 0 { + b.apb.sg.Wait() + } + b.addPeersIncrementally() + b.apb.mu.Unlock() + } +} + +// lock on b.apb level should be held +func (b *NetworkMapBuilder) addPeersIncrementally() { + peers := slices.Clone(b.apb.ids) + clear(b.apb.ids) + b.apb.ids = b.apb.ids[:0] + latestAcc := b.apb.la + b.apb.mu.Unlock() + + tt := time.Now() + b.cache.mu.Lock() + defer b.cache.mu.Unlock() + + account := b.updateAccountLocked(latestAcc) + + log.Debugf("NetworkMapBuilder: Starting incremental add of %d peers", len(peers)) + + allUpdates := make(map[string]*PeerUpdateDelta) + + for _, peerID := range peers { + peer := account.GetPeer(peerID) + if peer == nil { + b.apb.mu.Lock() + retries := b.apb.retryCount[peerID] + b.apb.mu.Unlock() + + if retries >= maxPeerAddRetries { + log.Errorf("NetworkMapBuilder: peer %s not found in account %s after %d retries, giving up", peerID, account.Id, retries) + b.apb.mu.Lock() + delete(b.apb.retryCount, peerID) + b.apb.mu.Unlock() + continue + } + + log.Warnf("NetworkMapBuilder: peer %s not found in account %s, retry %d/%d", peerID, account.Id, retries+1, maxPeerAddRetries) + b.apb.mu.Lock() + b.apb.retryCount[peerID] = retries + 1 + b.apb.mu.Unlock() + b.enqueuePeersForIncrementalAdd(latestAcc, peerID) + continue + } + + b.apb.mu.Lock() + delete(b.apb.retryCount, peerID) + b.apb.mu.Unlock() + + b.validatedPeers[peerID] = struct{}{} + b.cache.globalPeers[peerID] = peer + + peerGroups := b.updateIndexesForNewPeer(account, peerID) + b.buildPeerACLView(account, peerID) + b.buildPeerRoutesView(account, peerID) + b.buildPeerDNSView(account, peerID) + + peerDeltas := b.collectDeltasForNewPeer(account, peerID, peerGroups) + for affectedPeerID, delta := range peerDeltas { + if existing, ok := allUpdates[affectedPeerID]; ok { + existing.mergeFrom(delta) + continue + } + allUpdates[affectedPeerID] = delta + } + } + + for affectedPeerID, delta := range allUpdates { + b.applyDeltaToPeer(account, affectedPeerID, delta) + } + + log.Debugf("NetworkMapBuilder: Added %d peers to cache, affected %d peers, took %s", len(peers), len(allUpdates), time.Since(tt)) + + b.apb.mu.Lock() + if len(b.apb.ids) > 0 { + b.apb.sg.Signal() + } +} + +func (b *NetworkMapBuilder) enqueuePeersForIncrementalAdd(acc *Account, peerIDs ...string) { + b.apb.mu.Lock() + b.apb.ids = append(b.apb.ids, peerIDs...) + if b.apb.la != nil && acc.Network.CurrentSerial() > b.apb.la.Network.CurrentSerial() { + b.apb.la = acc + } + b.apb.sg.Signal() + b.apb.mu.Unlock() +} + +func (b *NetworkMapBuilder) EnqueuePeersForIncrementalAdd(acc *Account, peerIDs ...string) { + b.enqueuePeersForIncrementalAdd(acc, peerIDs...) +} + type ViewDelta struct { AddedPeerIDs []string RemovedPeerIDs []string @@ -1126,17 +1247,18 @@ type ViewDelta struct { RemovedRuleIDs []string } -func (b *NetworkMapBuilder) OnPeerAddedIncremental(peerID string) error { +func (b *NetworkMapBuilder) OnPeerAddedIncremental(acc *Account, peerID string) error { tt := time.Now() - account := b.account.Load() - peer := account.GetPeer(peerID) + peer := acc.GetPeer(peerID) if peer == nil { - return fmt.Errorf("peer %s not found in account", peerID) + return fmt.Errorf("NetworkMapBuilder: peer %s not found in account", peerID) } b.cache.mu.Lock() defer b.cache.mu.Unlock() + account := b.updateAccountLocked(acc) + log.Debugf("NetworkMapBuilder: Adding peer %s (IP: %s) to cache", peerID, peer.IP.String()) b.validatedPeers[peerID] = struct{}{} @@ -1195,6 +1317,13 @@ func (b *NetworkMapBuilder) updateIndexesForNewPeer(account *Account, peerID str } func (b *NetworkMapBuilder) incrementalUpdateAffectedPeers(account *Account, newPeerID string, peerGroups []string) { + updates := b.collectDeltasForNewPeer(account, newPeerID, peerGroups) + for affectedPeerID, delta := range updates { + b.applyDeltaToPeer(account, affectedPeerID, delta) + } +} + +func (b *NetworkMapBuilder) collectDeltasForNewPeer(account *Account, newPeerID string, peerGroups []string) map[string]*PeerUpdateDelta { updates := b.calculateIncrementalUpdates(account, newPeerID, peerGroups) if b.isPeerRouter(account, newPeerID) { @@ -1214,9 +1343,7 @@ func (b *NetworkMapBuilder) incrementalUpdateAffectedPeers(account *Account, new } } - for affectedPeerID, delta := range updates { - b.applyDeltaToPeer(account, affectedPeerID, delta) - } + return updates } func (b *NetworkMapBuilder) findPeersAffectedByNewRouter(account *Account, newRouterID string, routerGroups []string) map[string]struct{} { @@ -1410,8 +1537,8 @@ func (b *NetworkMapBuilder) calculateNewRouterNetworkResourceUpdates( updates[peerID] = delta } - if delta.AddConnectedPeer == "" { - delta.AddConnectedPeer = newPeerID + if !slices.Contains(delta.AddConnectedPeers, newPeerID) { + delta.AddConnectedPeers = append(delta.AddConnectedPeers, newPeerID) } delta.RebuildRoutesView = true @@ -1540,8 +1667,8 @@ func (b *NetworkMapBuilder) calculateNetworkResourceFirewallUpdates( updates[routerPeerID] = delta } - if delta.AddConnectedPeer == "" { - delta.AddConnectedPeer = newPeerID + if !slices.Contains(delta.AddConnectedPeers, newPeerID) { + delta.AddConnectedPeers = append(delta.AddConnectedPeers, newPeerID) } delta.RebuildRoutesView = true @@ -1551,13 +1678,63 @@ func (b *NetworkMapBuilder) calculateNetworkResourceFirewallUpdates( type PeerUpdateDelta struct { PeerID string - AddConnectedPeer string + AddConnectedPeers []string AddFirewallRules []*FirewallRuleDelta AddRoutes []route.ID UpdateRouteFirewallRules []*RouteFirewallRuleUpdate UpdateDNS bool RebuildRoutesView bool } + +func (d *PeerUpdateDelta) mergeFrom(other *PeerUpdateDelta) { + for _, peerID := range other.AddConnectedPeers { + if !slices.Contains(d.AddConnectedPeers, peerID) { + d.AddConnectedPeers = append(d.AddConnectedPeers, peerID) + } + } + + existingRuleIDs := make(map[string]struct{}, len(d.AddFirewallRules)) + for _, rule := range d.AddFirewallRules { + existingRuleIDs[rule.RuleID] = struct{}{} + } + for _, rule := range other.AddFirewallRules { + if _, exists := existingRuleIDs[rule.RuleID]; !exists { + d.AddFirewallRules = append(d.AddFirewallRules, rule) + existingRuleIDs[rule.RuleID] = struct{}{} + } + } + + for _, routeID := range other.AddRoutes { + if !slices.Contains(d.AddRoutes, routeID) { + d.AddRoutes = append(d.AddRoutes, routeID) + } + } + + existingRouteUpdates := make(map[string]map[string]struct{}) + for _, update := range d.UpdateRouteFirewallRules { + if existingRouteUpdates[update.RuleID] == nil { + existingRouteUpdates[update.RuleID] = make(map[string]struct{}) + } + existingRouteUpdates[update.RuleID][update.AddSourceIP] = struct{}{} + } + for _, update := range other.UpdateRouteFirewallRules { + if existingRouteUpdates[update.RuleID] == nil { + existingRouteUpdates[update.RuleID] = make(map[string]struct{}) + } + if _, exists := existingRouteUpdates[update.RuleID][update.AddSourceIP]; !exists { + d.UpdateRouteFirewallRules = append(d.UpdateRouteFirewallRules, update) + existingRouteUpdates[update.RuleID][update.AddSourceIP] = struct{}{} + } + } + + if other.UpdateDNS { + d.UpdateDNS = true + } + if other.RebuildRoutesView { + d.RebuildRoutesView = true + } +} + type FirewallRuleDelta struct { Rule *FirewallRule RuleID string @@ -1659,11 +1836,13 @@ func (b *NetworkMapBuilder) addOrUpdateFirewallRuleInDelta( delta := updates[targetPeerID] if delta == nil { delta = &PeerUpdateDelta{ - PeerID: targetPeerID, - AddConnectedPeer: newPeerID, - AddFirewallRules: make([]*FirewallRuleDelta, 0), + PeerID: targetPeerID, + AddConnectedPeers: []string{newPeerID}, + AddFirewallRules: make([]*FirewallRuleDelta, 0), } updates[targetPeerID] = delta + } else if !slices.Contains(delta.AddConnectedPeers, newPeerID) { + delta.AddConnectedPeers = append(delta.AddConnectedPeers, newPeerID) } baseRule.PeerIP = peerIP @@ -1689,10 +1868,12 @@ func (b *NetworkMapBuilder) addOrUpdateFirewallRuleInDelta( } func (b *NetworkMapBuilder) applyDeltaToPeer(account *Account, peerID string, delta *PeerUpdateDelta) { - if delta.AddConnectedPeer != "" || len(delta.AddFirewallRules) > 0 { + if len(delta.AddConnectedPeers) > 0 || len(delta.AddFirewallRules) > 0 { if aclView := b.cache.peerACLs[peerID]; aclView != nil { - if delta.AddConnectedPeer != "" && !slices.Contains(aclView.ConnectedPeerIDs, delta.AddConnectedPeer) { - aclView.ConnectedPeerIDs = append(aclView.ConnectedPeerIDs, delta.AddConnectedPeer) + for _, connectedPeerID := range delta.AddConnectedPeers { + if !slices.Contains(aclView.ConnectedPeerIDs, connectedPeerID) { + aclView.ConnectedPeerIDs = append(aclView.ConnectedPeerIDs, connectedPeerID) + } } for _, ruleDelta := range delta.AddFirewallRules { @@ -1748,11 +1929,11 @@ func (b *NetworkMapBuilder) updateRouteFirewallRules(routesView *PeerRoutesView, } } -func (b *NetworkMapBuilder) OnPeerDeleted(peerID string) error { +func (b *NetworkMapBuilder) OnPeerDeleted(acc *Account, peerID string) error { b.cache.mu.Lock() defer b.cache.mu.Unlock() - account := b.account.Load() + account := b.updateAccountLocked(acc) deletedPeer := b.cache.globalPeers[peerID] if deletedPeer == nil { @@ -1858,11 +2039,16 @@ func (b *NetworkMapBuilder) OnPeerDeleted(peerID string) error { b.buildPeerRoutesView(account, affectedPeerID) } - peerDeletionUpdates := b.findPeersAffectedByDeletedPeerACL(peerID, peerIP) + peersToRebuildACL := make(map[string]struct{}) + peerDeletionUpdates := b.findPeersAffectedByDeletedPeerACL(peerID, peerIP, peerGroups, peersToRebuildACL) for affectedPeerID, updates := range peerDeletionUpdates { b.applyDeletionUpdates(affectedPeerID, updates) } + for affectedPeerID := range peersToRebuildACL { + b.buildPeerACLView(account, affectedPeerID) + } + b.cleanupUnusedRules() log.Debugf("NetworkMapBuilder: Deleted peer %s, affected %d other peers", peerID, len(affectedPeers)) @@ -1873,6 +2059,8 @@ func (b *NetworkMapBuilder) OnPeerDeleted(peerID string) error { func (b *NetworkMapBuilder) findPeersAffectedByDeletedPeerACL( deletedPeerID string, peerIP string, + peerGroups []string, + peersToRebuildACL map[string]struct{}, ) map[string]*PeerDeletionUpdate { affected := make(map[string]*PeerDeletionUpdate) @@ -1882,26 +2070,47 @@ func (b *NetworkMapBuilder) findPeersAffectedByDeletedPeerACL( continue } - if !slices.Contains(aclView.ConnectedPeerIDs, deletedPeerID) { - continue - } - if affected[peerID] == nil { - affected[peerID] = &PeerDeletionUpdate{ - RemovePeerID: deletedPeerID, - PeerIP: peerIP, + if slices.Contains(aclView.ConnectedPeerIDs, deletedPeerID) { + peersToRebuildACL[peerID] = struct{}{} + if affected[peerID] == nil { + affected[peerID] = &PeerDeletionUpdate{ + RemovePeerID: deletedPeerID, + PeerIP: peerIP, + } } } + } - for _, ruleID := range aclView.FirewallRuleIDs { - if rule := b.cache.globalRules[ruleID]; rule != nil && rule.PeerIP == peerIP { - affected[peerID].RemoveFirewallRuleIDs = append( - affected[peerID].RemoveFirewallRuleIDs, - ruleID, - ) + affectedRouteOwners := make(map[string]struct{}) + + for _, groupID := range peerGroups { + if routeMap, ok := b.cache.acgToRoutes[groupID]; ok { + for _, info := range routeMap { + if info.PeerID != deletedPeerID { + affectedRouteOwners[info.PeerID] = struct{}{} + } } } } + for _, info := range b.cache.noACGRoutes { + if info.PeerID != deletedPeerID { + affectedRouteOwners[info.PeerID] = struct{}{} + } + } + + for ownerPeerID := range affectedRouteOwners { + if affected[ownerPeerID] == nil { + affected[ownerPeerID] = &PeerDeletionUpdate{ + RemovePeerID: deletedPeerID, + PeerIP: peerIP, + RemoveFromSourceRanges: true, + } + } else { + affected[ownerPeerID].RemoveFromSourceRanges = true + } + } + return affected } @@ -1914,18 +2123,6 @@ type PeerDeletionUpdate struct { } func (b *NetworkMapBuilder) applyDeletionUpdates(peerID string, updates *PeerDeletionUpdate) { - if aclView := b.cache.peerACLs[peerID]; aclView != nil { - aclView.ConnectedPeerIDs = slices.DeleteFunc(aclView.ConnectedPeerIDs, func(id string) bool { - return id == updates.RemovePeerID - }) - - if len(updates.RemoveFirewallRuleIDs) > 0 { - aclView.FirewallRuleIDs = slices.DeleteFunc(aclView.FirewallRuleIDs, func(ruleID string) bool { - return slices.Contains(updates.RemoveFirewallRuleIDs, ruleID) - }) - } - } - if routesView := b.cache.peerRoutes[peerID]; routesView != nil { if len(updates.RemoveRouteIDs) > 0 { routesView.NetworkResourceIDs = slices.DeleteFunc(routesView.NetworkResourceIDs, func(routeID route.ID) bool { diff --git a/management/server/user.go b/management/server/user.go index 9d4620462..e393b2c04 100644 --- a/management/server/user.go +++ b/management/server/user.go @@ -994,6 +994,12 @@ func (am *DefaultAccountManager) expireAndUpdatePeers(ctx context.Context, accou ) } + if len(peerIDs) != 0 { + if err := am.Store.IncrementNetworkSerial(ctx, accountID); err != nil { + return err + } + } + err = am.networkMapController.OnPeersUpdated(ctx, accountID, peerIDs) if err != nil { return fmt.Errorf("notify network map controller of peer update: %w", err)