[management] network map builder concurrent batch processing for peer updates (#5040)

This commit is contained in:
Vlad
2026-01-06 19:25:55 +01:00
committed by GitHub
parent 9bd578d4ea
commit 7142d45ef3
6 changed files with 390 additions and 94 deletions

View File

@@ -447,7 +447,9 @@ func (c *Controller) GetValidatedPeerWithMap(ctx context.Context, isRequiresAppr
if c.experimentalNetworkMap(accountID) {
networkMap = c.getPeerNetworkMapExp(ctx, peer.AccountID, peer.ID, approvedPeersMap, customZone, c.accountManagerMetrics)
} else {
networkMap = account.GetPeerNetworkMap(ctx, peer.ID, customZone, approvedPeersMap, account.GetResourcePoliciesMap(), account.GetResourceRoutersMap(), c.accountManagerMetrics, account.GetActiveGroupUsers())
resourcePolicies := account.GetResourcePoliciesMap()
routers := account.GetResourceRoutersMap()
networkMap = account.GetPeerNetworkMap(ctx, peer.ID, customZone, approvedPeersMap, resourcePolicies, routers, c.accountManagerMetrics, account.GetActiveGroupUsers())
}
proxyNetworkMap, ok := proxyNetworkMaps[peer.ID]
@@ -480,12 +482,13 @@ func (c *Controller) getPeerNetworkMapExp(
Network: &types.Network{},
}
}
return account.GetPeerNetworkMapExp(ctx, peerId, customZone, validatedPeers, metrics)
}
func (c *Controller) onPeerAddedUpdNetworkMapCache(account *types.Account, peerId string) error {
func (c *Controller) onPeersAddedUpdNetworkMapCache(account *types.Account, peerIds ...string) {
c.enrichAccountFromHolder(account)
return account.OnPeerAddedUpdNetworkMapCache(peerId)
account.OnPeersAddedUpdNetworkMapCache(peerIds...)
}
func (c *Controller) onPeerDeletedUpdNetworkMapCache(account *types.Account, peerId string) error {
@@ -537,7 +540,6 @@ func (c *Controller) enrichAccountFromHolder(account *types.Account) {
if account.NetworkMapCache == nil {
return
}
account.NetworkMapCache.UpdateAccountPointer(account)
c.holder.AddAccount(account)
}
@@ -715,18 +717,14 @@ func (c *Controller) OnPeersUpdated(ctx context.Context, accountID string, peerI
}
func (c *Controller) OnPeersAdded(ctx context.Context, accountID string, peerIDs []string) error {
for _, peerID := range peerIDs {
if c.experimentalNetworkMap(accountID) {
account, err := c.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil {
return err
}
err = c.onPeerAddedUpdNetworkMapCache(account, peerID)
if err != nil {
return err
}
log.WithContext(ctx).Debugf("OnPeersAdded call to add peers: %v", peerIDs)
if c.experimentalNetworkMap(accountID) {
account, err := c.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil {
return err
}
log.WithContext(ctx).Debugf("peers are ready to be added to networkmap cache: %v", peerIDs)
c.onPeersAddedUpdNetworkMapCache(account, peerIDs...)
}
return c.bufferSendUpdateAccountPeers(ctx, accountID)
}
@@ -813,7 +811,9 @@ func (c *Controller) GetNetworkMap(ctx context.Context, peerID string) (*types.N
if c.experimentalNetworkMap(peer.AccountID) {
networkMap = c.getPeerNetworkMapExp(ctx, peer.AccountID, peerID, validatedPeers, customZone, nil)
} else {
networkMap = account.GetPeerNetworkMap(ctx, peer.ID, customZone, validatedPeers, account.GetResourcePoliciesMap(), account.GetResourceRoutersMap(), nil, account.GetActiveGroupUsers())
resourcePolicies := account.GetResourcePoliciesMap()
routers := account.GetResourceRoutersMap()
networkMap = account.GetPeerNetworkMap(ctx, peer.ID, customZone, validatedPeers, resourcePolicies, routers, nil, account.GetActiveGroupUsers())
}
proxyNetworkMap, ok := proxyNetworkMaps[peer.ID]

View File

@@ -25,6 +25,10 @@ func (h *Holder) GetAccount(id string) *Account {
func (h *Holder) AddAccount(account *Account) {
h.mu.Lock()
defer h.mu.Unlock()
a := h.accounts[account.Id]
if a != nil && a.Network.CurrentSerial() >= account.Network.CurrentSerial() {
return
}
h.accounts[account.Id] = account
}

View File

@@ -36,14 +36,21 @@ func (a *Account) OnPeerAddedUpdNetworkMapCache(peerId string) error {
if a.NetworkMapCache == nil {
return nil
}
return a.NetworkMapCache.OnPeerAddedIncremental(peerId)
return a.NetworkMapCache.OnPeerAddedIncremental(a, peerId)
}
func (a *Account) OnPeersAddedUpdNetworkMapCache(peerIds ...string) {
if a.NetworkMapCache == nil {
return
}
a.NetworkMapCache.EnqueuePeersForIncrementalAdd(a, peerIds...)
}
func (a *Account) OnPeerDeletedUpdNetworkMapCache(peerId string) error {
if a.NetworkMapCache == nil {
return nil
}
return a.NetworkMapCache.OnPeerDeleted(peerId)
return a.NetworkMapCache.OnPeerDeleted(a, peerId)
}
func (a *Account) UpdatePeerInNetworkMapCache(peer *nbpeer.Peer) {

View File

@@ -266,7 +266,7 @@ func TestGetPeerNetworkMap_Golden_New_WithOnPeerAdded(t *testing.T) {
account.Network.Serial++
}
err := builder.OnPeerAddedIncremental(newPeerID)
err := builder.OnPeerAddedIncremental(account, newPeerID)
require.NoError(t, err, "error adding peer to cache")
networkMap := builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil)
@@ -328,7 +328,7 @@ func BenchmarkGetPeerNetworkMap_AfterPeerAdded(b *testing.B) {
b.ResetTimer()
b.Run("new builder after add", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = builder.OnPeerAddedIncremental(newPeerID)
_ = builder.OnPeerAddedIncremental(account, newPeerID)
for _, testingPeerID := range peerIDs {
_ = builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil)
}
@@ -473,7 +473,7 @@ func TestGetPeerNetworkMap_Golden_New_WithOnPeerAddedRouter(t *testing.T) {
account.Network.Serial++
}
err := builder.OnPeerAddedIncremental(newRouterID)
err := builder.OnPeerAddedIncremental(account, newRouterID)
require.NoError(t, err, "error adding router to cache")
networkMap := builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil)
@@ -558,7 +558,7 @@ func BenchmarkGetPeerNetworkMap_AfterRouterPeerAdded(b *testing.B) {
b.ResetTimer()
b.Run("new builder after add", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = builder.OnPeerAddedIncremental(newRouterID)
_ = builder.OnPeerAddedIncremental(account, newRouterID)
for _, testingPeerID := range peerIDs {
_ = builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil)
}
@@ -662,7 +662,7 @@ func TestGetPeerNetworkMap_Golden_New_WithOnPeerDeleted(t *testing.T) {
account.Network.Serial++
}
err := builder.OnPeerDeleted(deletedPeerID)
err := builder.OnPeerDeleted(account, deletedPeerID)
require.NoError(t, err, "error deleting peer from cache")
networkMap := builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil)
@@ -794,7 +794,7 @@ func TestGetPeerNetworkMap_Golden_New_WithDeletedRouterPeer(t *testing.T) {
account.Network.Serial++
}
err := builder.OnPeerDeleted(deletedRouterID)
err := builder.OnPeerDeleted(account, deletedRouterID)
require.NoError(t, err, "error deleting routing peer from cache")
networkMap := builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil)
@@ -855,7 +855,7 @@ func BenchmarkGetPeerNetworkMap_AfterPeerDeleted(b *testing.B) {
b.ResetTimer()
b.Run("new builder after delete", func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = builder.OnPeerDeleted(deletedPeerID)
_ = builder.OnPeerDeleted(account, deletedPeerID)
for _, testingPeerID := range peerIDs {
_ = builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil)
}
@@ -1067,3 +1067,85 @@ func createTestAccountWithEntities() *types.Account {
return account
}
func TestGetPeerNetworkMap_Golden_New_WithOnPeerAddedRouter_Batched(t *testing.T) {
account := createTestAccountWithEntities()
ctx := context.Background()
validatedPeersMap := make(map[string]struct{})
for i := range numPeers {
peerID := fmt.Sprintf("peer-%d", i)
if peerID == offlinePeerID {
continue
}
validatedPeersMap[peerID] = struct{}{}
}
builder := types.NewNetworkMapBuilder(account, validatedPeersMap)
newRouterID := "peer-new-router-102"
newRouterIP := net.IP{100, 64, 1, 2}
newRouter := &nbpeer.Peer{
ID: newRouterID,
IP: newRouterIP,
Key: fmt.Sprintf("key-%s", newRouterID),
DNSLabel: "newrouter102",
Status: &nbpeer.PeerStatus{Connected: true, LastSeen: time.Now()},
UserID: "user-admin",
Meta: nbpeer.PeerSystemMeta{WtVersion: "0.26.0", GoOS: "linux"},
LastLogin: func() *time.Time { t := time.Now(); return &t }(),
}
account.Peers[newRouterID] = newRouter
if opsGroup, exists := account.Groups[opsGroupID]; exists {
opsGroup.Peers = append(opsGroup.Peers, newRouterID)
}
if allGroup, exists := account.Groups[allGroupID]; exists {
allGroup.Peers = append(allGroup.Peers, newRouterID)
}
newRoute := &route.Route{
ID: route.ID("route-new-router"),
Network: netip.MustParsePrefix("172.16.0.0/24"),
Peer: newRouter.Key,
PeerID: newRouterID,
Description: "Route from new router",
Enabled: true,
PeerGroups: []string{opsGroupID},
Groups: []string{devGroupID, opsGroupID},
AccessControlGroups: []string{devGroupID},
AccountID: account.Id,
}
account.Routes[newRoute.ID] = newRoute
validatedPeersMap[newRouterID] = struct{}{}
if account.Network != nil {
account.Network.Serial++
}
builder.EnqueuePeersForIncrementalAdd(account, newRouterID)
time.Sleep(100 * time.Millisecond)
networkMap := builder.GetPeerNetworkMap(ctx, testingPeerID, dns.CustomZone{}, validatedPeersMap, nil)
normalizeAndSortNetworkMap(networkMap)
jsonData, err := json.MarshalIndent(networkMap, "", " ")
require.NoError(t, err, "error marshaling network map to JSON")
goldenFilePath := filepath.Join("testdata", "networkmap_golden_new_with_onpeeradded_router.json")
t.Log("Update golden file with OnPeerAdded router...")
err = os.MkdirAll(filepath.Dir(goldenFilePath), 0755)
require.NoError(t, err)
err = os.WriteFile(goldenFilePath, jsonData, 0644)
require.NoError(t, err)
expectedJSON, err := os.ReadFile(goldenFilePath)
require.NoError(t, err, "error reading golden file")
require.JSONEq(t, string(expectedJSON), string(jsonData), "network map from NEW builder with OnPeerAdded router does not match golden file")
}

View File

@@ -7,7 +7,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
log "github.com/sirupsen/logrus"
@@ -27,6 +26,9 @@ const (
v6AllWildcard = "::/0"
fw = "fw:"
rfw = "route-fw:"
szAddPeerBatch = 10
maxPeerAddRetries = 20
)
type NetworkMapCache struct {
@@ -75,9 +77,19 @@ type PeerRoutesView struct {
}
type NetworkMapBuilder struct {
account atomic.Pointer[Account]
account *Account
cache *NetworkMapCache
validatedPeers map[string]struct{}
apb addPeerBatch
}
type addPeerBatch struct {
mu sync.Mutex
sg *sync.Cond
ids []string
la *Account
retryCount map[string]int
}
func NewNetworkMapBuilder(account *Account, validatedPeers map[string]struct{}) *NetworkMapBuilder {
@@ -102,11 +114,16 @@ func NewNetworkMapBuilder(account *Account, validatedPeers map[string]struct{})
},
validatedPeers: make(map[string]struct{}),
}
builder.account.Store(account)
builder.apb.sg = sync.NewCond(&builder.apb.mu)
builder.apb.ids = make([]string, 0, szAddPeerBatch)
builder.apb.la = account
builder.apb.retryCount = make(map[string]int)
maps.Copy(builder.validatedPeers, validatedPeers)
builder.initialBuild(account)
go builder.incAddPeerLoop()
return builder
}
@@ -114,6 +131,8 @@ func (b *NetworkMapBuilder) initialBuild(account *Account) {
b.cache.mu.Lock()
defer b.cache.mu.Unlock()
b.account = account
start := time.Now()
b.buildGlobalIndexes(account)
@@ -259,6 +278,7 @@ func (b *NetworkMapBuilder) getPeerConnectionResources(account *Account, peer *n
validatedPeersMap map[string]struct{},
) ([]*nbpeer.Peer, []*FirewallRule) {
peerID := peer.ID
ctx := context.Background()
peerGroups := b.cache.peerToGroups[peerID]
peerGroupsMap := make(map[string]struct{}, len(peerGroups))
@@ -274,6 +294,9 @@ func (b *NetworkMapBuilder) getPeerConnectionResources(account *Account, peer *n
for _, group := range peerGroups {
policies := b.cache.groupToPolicies[group]
for _, policy := range policies {
if isValid := account.validatePostureChecksOnPeer(ctx, policy.SourcePostureChecks, peerID); !isValid {
continue
}
rules := b.cache.policyToRules[policy.ID]
for _, rule := range rules {
var sourcePeers, destinationPeers []*nbpeer.Peer
@@ -316,13 +339,13 @@ func (b *NetworkMapBuilder) getPeerConnectionResources(account *Account, peer *n
if rule.Bidirectional {
if peerInSources {
b.generateResourcescached(
account, rule, destinationPeers, FirewallRuleDirectionIN,
rule, destinationPeers, FirewallRuleDirectionIN,
peer, &peers, &fwRules, peersExists, rulesExists,
)
}
if peerInDestinations {
b.generateResourcescached(
account, rule, sourcePeers, FirewallRuleDirectionOUT,
rule, sourcePeers, FirewallRuleDirectionOUT,
peer, &peers, &fwRules, peersExists, rulesExists,
)
}
@@ -330,14 +353,14 @@ func (b *NetworkMapBuilder) getPeerConnectionResources(account *Account, peer *n
if peerInSources {
b.generateResourcescached(
account, rule, destinationPeers, FirewallRuleDirectionOUT,
rule, destinationPeers, FirewallRuleDirectionOUT,
peer, &peers, &fwRules, peersExists, rulesExists,
)
}
if peerInDestinations {
b.generateResourcescached(
account, rule, sourcePeers, FirewallRuleDirectionIN,
rule, sourcePeers, FirewallRuleDirectionIN,
peer, &peers, &fwRules, peersExists, rulesExists,
)
}
@@ -398,14 +421,9 @@ func (b *NetworkMapBuilder) getPeersFromGroupscached(account *Account, groupIDs
}
func (b *NetworkMapBuilder) generateResourcescached(
account *Account, rule *PolicyRule, groupPeers []*nbpeer.Peer, direction int, targetPeer *nbpeer.Peer,
rule *PolicyRule, groupPeers []*nbpeer.Peer, direction int, targetPeer *nbpeer.Peer,
peers *[]*nbpeer.Peer, rules *[]*FirewallRule, peersExists map[string]struct{}, rulesExists map[string]struct{},
) {
isAll := false
if allGroup, err := account.GetGroupAll(); err == nil {
isAll = (len(allGroup.Peers) - 1) == len(groupPeers)
}
for _, peer := range groupPeers {
if peer == nil {
continue
@@ -423,10 +441,6 @@ func (b *NetworkMapBuilder) generateResourcescached(
Protocol: string(rule.Protocol),
}
if isAll {
fr.PeerIP = allPeers
}
var s strings.Builder
s.WriteString(rule.ID)
s.WriteString(fr.PeerIP)
@@ -931,8 +945,12 @@ func (b *NetworkMapBuilder) getPeerNSGroups(account *Account, peerID string, che
return peerNSGroups
}
func (b *NetworkMapBuilder) UpdateAccountPointer(account *Account) {
b.account.Store(account)
// lock should be held
func (b *NetworkMapBuilder) updateAccountLocked(account *Account) *Account {
if account.Network.CurrentSerial() > b.account.Network.CurrentSerial() {
b.account = account
}
return b.account
}
func (b *NetworkMapBuilder) GetPeerNetworkMap(
@@ -940,16 +958,17 @@ func (b *NetworkMapBuilder) GetPeerNetworkMap(
validatedPeers map[string]struct{}, metrics *telemetry.AccountManagerMetrics,
) *NetworkMap {
start := time.Now()
account := b.account.Load()
b.cache.mu.RLock()
defer b.cache.mu.RUnlock()
account := b.account
peer := account.GetPeer(peerID)
if peer == nil {
return &NetworkMap{Network: account.Network.Copy()}
}
b.cache.mu.RLock()
defer b.cache.mu.RUnlock()
aclView := b.cache.peerACLs[peerID]
routesView := b.cache.peerRoutes[peerID]
dnsConfig := b.cache.peerDNS[peerID]
@@ -1013,6 +1032,8 @@ func (b *NetworkMapBuilder) assembleNetworkMap(
for _, ruleID := range aclView.FirewallRuleIDs {
if rule := b.cache.globalRules[ruleID]; rule != nil {
firewallRules = append(firewallRules, rule)
} else {
log.Debugf("NetworkMapBuilder: peer %s assembling network map has no fwrule %s in globalRules", peer.ID, ruleID)
}
}
@@ -1119,6 +1140,106 @@ func (b *NetworkMapBuilder) isPeerRouter(account *Account, peerID string) bool {
return false
}
func (b *NetworkMapBuilder) incAddPeerLoop() {
for {
b.apb.mu.Lock()
if len(b.apb.ids) == 0 {
b.apb.sg.Wait()
}
b.addPeersIncrementally()
b.apb.mu.Unlock()
}
}
// lock on b.apb level should be held
func (b *NetworkMapBuilder) addPeersIncrementally() {
peers := slices.Clone(b.apb.ids)
clear(b.apb.ids)
b.apb.ids = b.apb.ids[:0]
latestAcc := b.apb.la
b.apb.mu.Unlock()
tt := time.Now()
b.cache.mu.Lock()
defer b.cache.mu.Unlock()
account := b.updateAccountLocked(latestAcc)
log.Debugf("NetworkMapBuilder: Starting incremental add of %d peers", len(peers))
allUpdates := make(map[string]*PeerUpdateDelta)
for _, peerID := range peers {
peer := account.GetPeer(peerID)
if peer == nil {
b.apb.mu.Lock()
retries := b.apb.retryCount[peerID]
b.apb.mu.Unlock()
if retries >= maxPeerAddRetries {
log.Errorf("NetworkMapBuilder: peer %s not found in account %s after %d retries, giving up", peerID, account.Id, retries)
b.apb.mu.Lock()
delete(b.apb.retryCount, peerID)
b.apb.mu.Unlock()
continue
}
log.Warnf("NetworkMapBuilder: peer %s not found in account %s, retry %d/%d", peerID, account.Id, retries+1, maxPeerAddRetries)
b.apb.mu.Lock()
b.apb.retryCount[peerID] = retries + 1
b.apb.mu.Unlock()
b.enqueuePeersForIncrementalAdd(latestAcc, peerID)
continue
}
b.apb.mu.Lock()
delete(b.apb.retryCount, peerID)
b.apb.mu.Unlock()
b.validatedPeers[peerID] = struct{}{}
b.cache.globalPeers[peerID] = peer
peerGroups := b.updateIndexesForNewPeer(account, peerID)
b.buildPeerACLView(account, peerID)
b.buildPeerRoutesView(account, peerID)
b.buildPeerDNSView(account, peerID)
peerDeltas := b.collectDeltasForNewPeer(account, peerID, peerGroups)
for affectedPeerID, delta := range peerDeltas {
if existing, ok := allUpdates[affectedPeerID]; ok {
existing.mergeFrom(delta)
continue
}
allUpdates[affectedPeerID] = delta
}
}
for affectedPeerID, delta := range allUpdates {
b.applyDeltaToPeer(account, affectedPeerID, delta)
}
log.Debugf("NetworkMapBuilder: Added %d peers to cache, affected %d peers, took %s", len(peers), len(allUpdates), time.Since(tt))
b.apb.mu.Lock()
if len(b.apb.ids) > 0 {
b.apb.sg.Signal()
}
}
func (b *NetworkMapBuilder) enqueuePeersForIncrementalAdd(acc *Account, peerIDs ...string) {
b.apb.mu.Lock()
b.apb.ids = append(b.apb.ids, peerIDs...)
if b.apb.la != nil && acc.Network.CurrentSerial() > b.apb.la.Network.CurrentSerial() {
b.apb.la = acc
}
b.apb.sg.Signal()
b.apb.mu.Unlock()
}
func (b *NetworkMapBuilder) EnqueuePeersForIncrementalAdd(acc *Account, peerIDs ...string) {
b.enqueuePeersForIncrementalAdd(acc, peerIDs...)
}
type ViewDelta struct {
AddedPeerIDs []string
RemovedPeerIDs []string
@@ -1126,17 +1247,18 @@ type ViewDelta struct {
RemovedRuleIDs []string
}
func (b *NetworkMapBuilder) OnPeerAddedIncremental(peerID string) error {
func (b *NetworkMapBuilder) OnPeerAddedIncremental(acc *Account, peerID string) error {
tt := time.Now()
account := b.account.Load()
peer := account.GetPeer(peerID)
peer := acc.GetPeer(peerID)
if peer == nil {
return fmt.Errorf("peer %s not found in account", peerID)
return fmt.Errorf("NetworkMapBuilder: peer %s not found in account", peerID)
}
b.cache.mu.Lock()
defer b.cache.mu.Unlock()
account := b.updateAccountLocked(acc)
log.Debugf("NetworkMapBuilder: Adding peer %s (IP: %s) to cache", peerID, peer.IP.String())
b.validatedPeers[peerID] = struct{}{}
@@ -1195,6 +1317,13 @@ func (b *NetworkMapBuilder) updateIndexesForNewPeer(account *Account, peerID str
}
func (b *NetworkMapBuilder) incrementalUpdateAffectedPeers(account *Account, newPeerID string, peerGroups []string) {
updates := b.collectDeltasForNewPeer(account, newPeerID, peerGroups)
for affectedPeerID, delta := range updates {
b.applyDeltaToPeer(account, affectedPeerID, delta)
}
}
func (b *NetworkMapBuilder) collectDeltasForNewPeer(account *Account, newPeerID string, peerGroups []string) map[string]*PeerUpdateDelta {
updates := b.calculateIncrementalUpdates(account, newPeerID, peerGroups)
if b.isPeerRouter(account, newPeerID) {
@@ -1214,9 +1343,7 @@ func (b *NetworkMapBuilder) incrementalUpdateAffectedPeers(account *Account, new
}
}
for affectedPeerID, delta := range updates {
b.applyDeltaToPeer(account, affectedPeerID, delta)
}
return updates
}
func (b *NetworkMapBuilder) findPeersAffectedByNewRouter(account *Account, newRouterID string, routerGroups []string) map[string]struct{} {
@@ -1410,8 +1537,8 @@ func (b *NetworkMapBuilder) calculateNewRouterNetworkResourceUpdates(
updates[peerID] = delta
}
if delta.AddConnectedPeer == "" {
delta.AddConnectedPeer = newPeerID
if !slices.Contains(delta.AddConnectedPeers, newPeerID) {
delta.AddConnectedPeers = append(delta.AddConnectedPeers, newPeerID)
}
delta.RebuildRoutesView = true
@@ -1540,8 +1667,8 @@ func (b *NetworkMapBuilder) calculateNetworkResourceFirewallUpdates(
updates[routerPeerID] = delta
}
if delta.AddConnectedPeer == "" {
delta.AddConnectedPeer = newPeerID
if !slices.Contains(delta.AddConnectedPeers, newPeerID) {
delta.AddConnectedPeers = append(delta.AddConnectedPeers, newPeerID)
}
delta.RebuildRoutesView = true
@@ -1551,13 +1678,63 @@ func (b *NetworkMapBuilder) calculateNetworkResourceFirewallUpdates(
type PeerUpdateDelta struct {
PeerID string
AddConnectedPeer string
AddConnectedPeers []string
AddFirewallRules []*FirewallRuleDelta
AddRoutes []route.ID
UpdateRouteFirewallRules []*RouteFirewallRuleUpdate
UpdateDNS bool
RebuildRoutesView bool
}
func (d *PeerUpdateDelta) mergeFrom(other *PeerUpdateDelta) {
for _, peerID := range other.AddConnectedPeers {
if !slices.Contains(d.AddConnectedPeers, peerID) {
d.AddConnectedPeers = append(d.AddConnectedPeers, peerID)
}
}
existingRuleIDs := make(map[string]struct{}, len(d.AddFirewallRules))
for _, rule := range d.AddFirewallRules {
existingRuleIDs[rule.RuleID] = struct{}{}
}
for _, rule := range other.AddFirewallRules {
if _, exists := existingRuleIDs[rule.RuleID]; !exists {
d.AddFirewallRules = append(d.AddFirewallRules, rule)
existingRuleIDs[rule.RuleID] = struct{}{}
}
}
for _, routeID := range other.AddRoutes {
if !slices.Contains(d.AddRoutes, routeID) {
d.AddRoutes = append(d.AddRoutes, routeID)
}
}
existingRouteUpdates := make(map[string]map[string]struct{})
for _, update := range d.UpdateRouteFirewallRules {
if existingRouteUpdates[update.RuleID] == nil {
existingRouteUpdates[update.RuleID] = make(map[string]struct{})
}
existingRouteUpdates[update.RuleID][update.AddSourceIP] = struct{}{}
}
for _, update := range other.UpdateRouteFirewallRules {
if existingRouteUpdates[update.RuleID] == nil {
existingRouteUpdates[update.RuleID] = make(map[string]struct{})
}
if _, exists := existingRouteUpdates[update.RuleID][update.AddSourceIP]; !exists {
d.UpdateRouteFirewallRules = append(d.UpdateRouteFirewallRules, update)
existingRouteUpdates[update.RuleID][update.AddSourceIP] = struct{}{}
}
}
if other.UpdateDNS {
d.UpdateDNS = true
}
if other.RebuildRoutesView {
d.RebuildRoutesView = true
}
}
type FirewallRuleDelta struct {
Rule *FirewallRule
RuleID string
@@ -1659,11 +1836,13 @@ func (b *NetworkMapBuilder) addOrUpdateFirewallRuleInDelta(
delta := updates[targetPeerID]
if delta == nil {
delta = &PeerUpdateDelta{
PeerID: targetPeerID,
AddConnectedPeer: newPeerID,
AddFirewallRules: make([]*FirewallRuleDelta, 0),
PeerID: targetPeerID,
AddConnectedPeers: []string{newPeerID},
AddFirewallRules: make([]*FirewallRuleDelta, 0),
}
updates[targetPeerID] = delta
} else if !slices.Contains(delta.AddConnectedPeers, newPeerID) {
delta.AddConnectedPeers = append(delta.AddConnectedPeers, newPeerID)
}
baseRule.PeerIP = peerIP
@@ -1689,10 +1868,12 @@ func (b *NetworkMapBuilder) addOrUpdateFirewallRuleInDelta(
}
func (b *NetworkMapBuilder) applyDeltaToPeer(account *Account, peerID string, delta *PeerUpdateDelta) {
if delta.AddConnectedPeer != "" || len(delta.AddFirewallRules) > 0 {
if len(delta.AddConnectedPeers) > 0 || len(delta.AddFirewallRules) > 0 {
if aclView := b.cache.peerACLs[peerID]; aclView != nil {
if delta.AddConnectedPeer != "" && !slices.Contains(aclView.ConnectedPeerIDs, delta.AddConnectedPeer) {
aclView.ConnectedPeerIDs = append(aclView.ConnectedPeerIDs, delta.AddConnectedPeer)
for _, connectedPeerID := range delta.AddConnectedPeers {
if !slices.Contains(aclView.ConnectedPeerIDs, connectedPeerID) {
aclView.ConnectedPeerIDs = append(aclView.ConnectedPeerIDs, connectedPeerID)
}
}
for _, ruleDelta := range delta.AddFirewallRules {
@@ -1748,11 +1929,11 @@ func (b *NetworkMapBuilder) updateRouteFirewallRules(routesView *PeerRoutesView,
}
}
func (b *NetworkMapBuilder) OnPeerDeleted(peerID string) error {
func (b *NetworkMapBuilder) OnPeerDeleted(acc *Account, peerID string) error {
b.cache.mu.Lock()
defer b.cache.mu.Unlock()
account := b.account.Load()
account := b.updateAccountLocked(acc)
deletedPeer := b.cache.globalPeers[peerID]
if deletedPeer == nil {
@@ -1858,11 +2039,16 @@ func (b *NetworkMapBuilder) OnPeerDeleted(peerID string) error {
b.buildPeerRoutesView(account, affectedPeerID)
}
peerDeletionUpdates := b.findPeersAffectedByDeletedPeerACL(peerID, peerIP)
peersToRebuildACL := make(map[string]struct{})
peerDeletionUpdates := b.findPeersAffectedByDeletedPeerACL(peerID, peerIP, peerGroups, peersToRebuildACL)
for affectedPeerID, updates := range peerDeletionUpdates {
b.applyDeletionUpdates(affectedPeerID, updates)
}
for affectedPeerID := range peersToRebuildACL {
b.buildPeerACLView(account, affectedPeerID)
}
b.cleanupUnusedRules()
log.Debugf("NetworkMapBuilder: Deleted peer %s, affected %d other peers", peerID, len(affectedPeers))
@@ -1873,6 +2059,8 @@ func (b *NetworkMapBuilder) OnPeerDeleted(peerID string) error {
func (b *NetworkMapBuilder) findPeersAffectedByDeletedPeerACL(
deletedPeerID string,
peerIP string,
peerGroups []string,
peersToRebuildACL map[string]struct{},
) map[string]*PeerDeletionUpdate {
affected := make(map[string]*PeerDeletionUpdate)
@@ -1882,26 +2070,47 @@ func (b *NetworkMapBuilder) findPeersAffectedByDeletedPeerACL(
continue
}
if !slices.Contains(aclView.ConnectedPeerIDs, deletedPeerID) {
continue
}
if affected[peerID] == nil {
affected[peerID] = &PeerDeletionUpdate{
RemovePeerID: deletedPeerID,
PeerIP: peerIP,
if slices.Contains(aclView.ConnectedPeerIDs, deletedPeerID) {
peersToRebuildACL[peerID] = struct{}{}
if affected[peerID] == nil {
affected[peerID] = &PeerDeletionUpdate{
RemovePeerID: deletedPeerID,
PeerIP: peerIP,
}
}
}
}
for _, ruleID := range aclView.FirewallRuleIDs {
if rule := b.cache.globalRules[ruleID]; rule != nil && rule.PeerIP == peerIP {
affected[peerID].RemoveFirewallRuleIDs = append(
affected[peerID].RemoveFirewallRuleIDs,
ruleID,
)
affectedRouteOwners := make(map[string]struct{})
for _, groupID := range peerGroups {
if routeMap, ok := b.cache.acgToRoutes[groupID]; ok {
for _, info := range routeMap {
if info.PeerID != deletedPeerID {
affectedRouteOwners[info.PeerID] = struct{}{}
}
}
}
}
for _, info := range b.cache.noACGRoutes {
if info.PeerID != deletedPeerID {
affectedRouteOwners[info.PeerID] = struct{}{}
}
}
for ownerPeerID := range affectedRouteOwners {
if affected[ownerPeerID] == nil {
affected[ownerPeerID] = &PeerDeletionUpdate{
RemovePeerID: deletedPeerID,
PeerIP: peerIP,
RemoveFromSourceRanges: true,
}
} else {
affected[ownerPeerID].RemoveFromSourceRanges = true
}
}
return affected
}
@@ -1914,18 +2123,6 @@ type PeerDeletionUpdate struct {
}
func (b *NetworkMapBuilder) applyDeletionUpdates(peerID string, updates *PeerDeletionUpdate) {
if aclView := b.cache.peerACLs[peerID]; aclView != nil {
aclView.ConnectedPeerIDs = slices.DeleteFunc(aclView.ConnectedPeerIDs, func(id string) bool {
return id == updates.RemovePeerID
})
if len(updates.RemoveFirewallRuleIDs) > 0 {
aclView.FirewallRuleIDs = slices.DeleteFunc(aclView.FirewallRuleIDs, func(ruleID string) bool {
return slices.Contains(updates.RemoveFirewallRuleIDs, ruleID)
})
}
}
if routesView := b.cache.peerRoutes[peerID]; routesView != nil {
if len(updates.RemoveRouteIDs) > 0 {
routesView.NetworkResourceIDs = slices.DeleteFunc(routesView.NetworkResourceIDs, func(routeID route.ID) bool {

View File

@@ -994,6 +994,12 @@ func (am *DefaultAccountManager) expireAndUpdatePeers(ctx context.Context, accou
)
}
if len(peerIDs) != 0 {
if err := am.Store.IncrementNetworkSerial(ctx, accountID); err != nil {
return err
}
}
err = am.networkMapController.OnPeersUpdated(ctx, accountID, peerIDs)
if err != nil {
return fmt.Errorf("notify network map controller of peer update: %w", err)