diff --git a/management/server/peer.go b/management/server/peer.go index 639ff8801..bb376c973 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -6,6 +6,7 @@ import ( b64 "encoding/base64" "fmt" "net" + "runtime" "slices" "strings" "sync" @@ -1289,53 +1290,64 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account dnsFwdPort := computeForwarderPort(maps.Values(account.Peers), dnsForwarderPortMinVersion) + processNetworkMap := func(p *nbpeer.Peer) { + start := time.Now() + + postureChecks, err := am.getPeerPostureChecks(account, p.ID) + if err != nil { + log.WithContext(ctx).Debugf("failed to get posture checks for peer %s: %v", p.ID, err) + return + } + + am.metrics.UpdateChannelMetrics().CountCalcPostureChecksDuration(time.Since(start)) + start = time.Now() + + var remotePeerNetworkMap *types.NetworkMap + + if am.experimentalNetworkMap(accountID) { + remotePeerNetworkMap = am.getPeerNetworkMapExp(ctx, p.AccountID, p.ID, approvedPeersMap, customZone, am.metrics.AccountManagerMetrics()) + } else { + remotePeerNetworkMap = account.GetPeerNetworkMap(ctx, p.ID, customZone, approvedPeersMap, resourcePolicies, routers, am.metrics.AccountManagerMetrics()) + } + + am.metrics.UpdateChannelMetrics().CountCalcPeerNetworkMapDuration(time.Since(start)) + start = time.Now() + + proxyNetworkMap, ok := proxyNetworkMaps[p.ID] + if ok { + remotePeerNetworkMap.Merge(proxyNetworkMap) + } + am.metrics.UpdateChannelMetrics().CountMergeNetworkMapDuration(time.Since(start)) + + peerGroups := account.GetPeerGroups(p.ID) + start = time.Now() + update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting, maps.Keys(peerGroups), dnsFwdPort) + am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start)) + + am.peersUpdateManager.SendNetworkMapUpdate(ctx, p.ID, &UpdateMessage{Update: update}) + } + + numWorkers := runtime.NumCPU() + wch := make(chan *nbpeer.Peer, numWorkers) + wg.Add(numWorkers) + for range numWorkers { + go func() { + for peer := range wch { + processNetworkMap(peer) + } + wg.Done() + }() + } + for _, peer := range account.Peers { if !am.peersUpdateManager.HasChannel(peer.ID) { log.WithContext(ctx).Tracef("peer %s doesn't have a channel, skipping network map update", peer.ID) continue } - - wg.Add(1) - go func(p *nbpeer.Peer) { - defer wg.Done() - - start := time.Now() - - postureChecks, err := am.getPeerPostureChecks(account, p.ID) - if err != nil { - log.WithContext(ctx).Debugf("failed to get posture checks for peer %s: %v", peer.ID, err) - return - } - - am.metrics.UpdateChannelMetrics().CountCalcPostureChecksDuration(time.Since(start)) - start = time.Now() - - var remotePeerNetworkMap *types.NetworkMap - - if am.experimentalNetworkMap(accountID) { - remotePeerNetworkMap = am.getPeerNetworkMapExp(ctx, p.AccountID, p.ID, approvedPeersMap, customZone, am.metrics.AccountManagerMetrics()) - } else { - remotePeerNetworkMap = account.GetPeerNetworkMap(ctx, p.ID, customZone, approvedPeersMap, resourcePolicies, routers, am.metrics.AccountManagerMetrics()) - } - - am.metrics.UpdateChannelMetrics().CountCalcPeerNetworkMapDuration(time.Since(start)) - start = time.Now() - - proxyNetworkMap, ok := proxyNetworkMaps[p.ID] - if ok { - remotePeerNetworkMap.Merge(proxyNetworkMap) - } - am.metrics.UpdateChannelMetrics().CountMergeNetworkMapDuration(time.Since(start)) - - peerGroups := account.GetPeerGroups(p.ID) - start = time.Now() - update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting, maps.Keys(peerGroups), dnsFwdPort) - am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start)) - - am.peersUpdateManager.SendNetworkMapUpdate(ctx, p.ID, &UpdateMessage{Update: update}) - }(peer) + wch <- peer } + close(wch) wg.Wait() if am.metrics != nil { am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart)) diff --git a/management/server/types/networkmap.go b/management/server/types/networkmap.go index 58cb3c690..6f66b787f 100644 --- a/management/server/types/networkmap.go +++ b/management/server/types/networkmap.go @@ -31,6 +31,33 @@ type NetworkMap struct { ForwardingRules []*ForwardingRule } +// var nmapPool = sync.Pool{ +// New: func() any { +// return &NetworkMap{} +// }, +// } + +// func GetNetworkMap() *NetworkMap { +// n := nmapPool.Get().(*NetworkMap) +// n.reset() +// return n +// } + +// func PutNetworkMap(n *NetworkMap) { +// nmapPool.Put(n) +// } + +// func (n *NetworkMap) reset() { +// n.Peers = n.Peers[:0] +// n.Network = nil +// n.Routes = n.Routes[:0] +// n.DNSConfig = nbdns.Config{} +// n.OfflinePeers = n.OfflinePeers[:0] +// n.FirewallRules = n.FirewallRules[:0] +// n.RoutesFirewallRules = n.RoutesFirewallRules[:0] +// n.ForwardingRules = n.ForwardingRules[:0] +// } + func (nm *NetworkMap) Merge(other *NetworkMap) { nm.Peers = mergeUniquePeersByID(nm.Peers, other.Peers) nm.Routes = util.MergeUnique(nm.Routes, other.Routes) @@ -121,6 +148,14 @@ func (a *Account) GetPeerNetworkMap( dnsUpdate.NameServerGroups = getPeerNSGroups(a, peerID) } + // nm := GetNetworkMap() + // nm.Peers = peersToConnectIncludingRouters + // nm.Network = a.Network.Copy() + // nm.Routes = slices.Concat(networkResourcesRoutes, routesUpdate) + // nm.DNSConfig = dnsUpdate + // nm.OfflinePeers = expiredPeers + // nm.FirewallRules = firewallRules + // nm.RoutesFirewallRules = slices.Concat(networkResourcesFirewallRules, routesFirewallRules) nm := &NetworkMap{ Peers: peersToConnectIncludingRouters, Network: a.Network.Copy(),