Compare commits

..

2 Commits

Author SHA1 Message Date
Maycon Santos
ab06ef1812 [misc] Update careers page link 2026-06-25 02:21:34 +02:00
Viktor Liu
17b2044596 [client] Skip re-resolving cached management cache domains (#6518) 2026-06-23 17:55:57 +02:00
9 changed files with 588 additions and 510 deletions

View File

@@ -33,7 +33,7 @@
<br/>
<br/>
<strong>
🚀 <a href="https://careers.netbird.io">We are hiring! Join us at careers.netbird.io</a>
🚀 <a href="https://netbird.io/careers">We are hiring! Join us at https://netbird.io/careers</a>
</strong>
</p>

View File

@@ -51,13 +51,20 @@ type cachedRecord struct {
}
// Resolver caches critical NetBird infrastructure domains.
// records, refreshing, mgmtDomain and serverDomains are all guarded by mutex.
// records, refreshing, failedResolves, mgmtDomain and serverDomains are all
// guarded by mutex.
type Resolver struct {
records map[dns.Question]*cachedRecord
mgmtDomain *domain.Domain
serverDomains *dnsconfig.ServerDomains
mutex sync.RWMutex
// failedResolves records the last failed initial resolve per domain so a
// domain that never resolves isn't retried on every server-domains update
// until refreshBackoff elapses. Entries are cleared on success and pruned
// to the current server-domains set.
failedResolves map[domain.Domain]time.Time
chain ChainResolver
chainMaxPriority int
refreshGroup singleflight.Group
@@ -76,9 +83,10 @@ type Resolver struct {
// NewResolver creates a new management domains cache resolver.
func NewResolver() *Resolver {
return &Resolver{
records: make(map[dns.Question]*cachedRecord),
refreshing: make(map[dns.Question]*atomic.Bool),
cacheTTL: resolveCacheTTL(),
records: make(map[dns.Question]*cachedRecord),
refreshing: make(map[dns.Question]*atomic.Bool),
failedResolves: make(map[domain.Domain]time.Time),
cacheTTL: resolveCacheTTL(),
}
}
@@ -173,7 +181,9 @@ func (m *Resolver) continueToNext(w dns.ResponseWriter, r *dns.Msg) {
// AddDomain resolves a domain and stores its A/AAAA records in the cache.
// A family that resolves NODATA (nil err, zero records) evicts any stale
// entry for that qtype.
// entry for that qtype. When one family hard-errors while the other succeeds,
// the resolved family is still cached but AddDomain returns an error so the
// caller retries the incomplete resolve rather than treating it as complete.
func (m *Resolver) AddDomain(ctx context.Context, d domain.Domain) error {
dnsName := strings.ToLower(dns.Fqdn(d.PunycodeString()))
@@ -203,6 +213,10 @@ func (m *Resolver) AddDomain(ctx context.Context, d domain.Domain) error {
log.Debugf("added/updated domain=%s with %d A records and %d AAAA records",
d.SafeString(), len(aRecords), len(aaaaRecords))
if errA != nil || errAAAA != nil {
return fmt.Errorf("resolve %s: incomplete, a family failed: %w", d.SafeString(), errors.Join(errA, errAAAA))
}
return nil
}
@@ -462,6 +476,7 @@ func (m *Resolver) RemoveDomain(d domain.Domain) error {
delete(m.records, qAAAA)
delete(m.refreshing, qA)
delete(m.refreshing, qAAAA)
delete(m.failedResolves, d)
log.Debugf("removed domain=%s from cache", d.SafeString())
return nil
@@ -505,6 +520,7 @@ func (m *Resolver) UpdateFromServerDomains(ctx context.Context, serverDomains dn
allDomains := m.extractDomainsFromServerDomains(updatedServerDomains)
currentDomains := m.GetCachedDomains()
removedDomains = m.removeStaleDomains(currentDomains, allDomains)
m.pruneFailedResolves(allDomains)
}
m.addNewDomains(ctx, newDomains)
@@ -577,13 +593,85 @@ func (m *Resolver) isManagementDomain(domain domain.Domain) bool {
return m.mgmtDomain != nil && domain == *m.mgmtDomain
}
// addNewDomains resolves and caches all domains from the update
// addNewDomains resolves and caches domains that are not yet in the cache,
// running the lookups concurrently. Domains already cached are skipped and left
// to the stale-while-revalidate refresh path, so a sync never re-resolves them
// synchronously: once NetBird owns the OS resolver the resolve runs through the
// handler chain and would otherwise dial the managed upstreams under the engine
// sync lock on every update.
func (m *Resolver) addNewDomains(ctx context.Context, newDomains domain.List) {
var wg sync.WaitGroup
seen := make(map[domain.Domain]struct{}, len(newDomains))
for _, newDomain := range newDomains {
if err := m.AddDomain(ctx, newDomain); err != nil {
log.Warnf("failed to add/update domain=%s: %v", newDomain.SafeString(), err)
} else {
log.Debugf("added/updated management cache domain=%s", newDomain.SafeString())
if _, dup := seen[newDomain]; dup {
continue
}
seen[newDomain] = struct{}{}
if !m.needsResolve(newDomain) {
continue
}
wg.Add(1)
go func(d domain.Domain) {
defer wg.Done()
if err := m.AddDomain(ctx, d); err != nil {
m.markResolveFailed(d)
log.Warnf("failed to add/update domain=%s: %v", d.SafeString(), err)
return
}
m.clearResolveFailed(d)
log.Debugf("added/updated management cache domain=%s", d.SafeString())
}(newDomain)
}
wg.Wait()
}
// needsResolve reports whether d should be resolved now. A recent failed or
// incomplete resolve gates retries on the backoff even when one family is
// already cached, so a transiently-failed family is retried instead of being
// treated as fully resolved. Otherwise a domain with any cached record is left
// to the stale-while-revalidate refresh path.
func (m *Resolver) needsResolve(d domain.Domain) bool {
dnsName := strings.ToLower(dns.Fqdn(d.PunycodeString()))
m.mutex.RLock()
defer m.mutex.RUnlock()
if failedAt, ok := m.failedResolves[d]; ok {
return time.Since(failedAt) >= refreshBackoff
}
for _, qtype := range []uint16{dns.TypeA, dns.TypeAAAA} {
q := dns.Question{Name: dnsName, Qtype: qtype, Qclass: dns.ClassINET}
if _, ok := m.records[q]; ok {
return false
}
}
return true
}
func (m *Resolver) markResolveFailed(d domain.Domain) {
m.mutex.Lock()
m.failedResolves[d] = time.Now()
m.mutex.Unlock()
}
func (m *Resolver) clearResolveFailed(d domain.Domain) {
m.mutex.Lock()
delete(m.failedResolves, d)
m.mutex.Unlock()
}
// pruneFailedResolves drops failure markers for domains no longer present in
// the server-domains set, keeping the map bounded to the current set (a
// failed-only domain has no cached record, so RemoveDomain never sees it).
func (m *Resolver) pruneFailedResolves(domains domain.List) {
m.mutex.Lock()
defer m.mutex.Unlock()
for d := range m.failedResolves {
if !slices.Contains(domains, d) {
delete(m.failedResolves, d)
}
}
}

View File

@@ -21,6 +21,7 @@ type fakeChain struct {
mu sync.Mutex
calls map[string]int
answers map[string][]dns.RR
qErr map[string]error
err error
hasRoot bool
onLookup func()
@@ -30,6 +31,7 @@ func newFakeChain() *fakeChain {
return &fakeChain{
calls: map[string]int{},
answers: map[string][]dns.RR{},
qErr: map[string]error{},
hasRoot: true,
}
}
@@ -47,6 +49,9 @@ func (f *fakeChain) ResolveInternal(ctx context.Context, msg *dns.Msg, maxPriori
f.calls[key]++
answers := f.answers[key]
err := f.err
if err == nil {
err = f.qErr[key]
}
onLookup := f.onLookup
f.mu.Unlock()
@@ -75,6 +80,12 @@ func (f *fakeChain) setAnswer(name string, qtype uint16, ip string) {
}
}
func (f *fakeChain) setErr(name string, qtype uint16, err error) {
f.mu.Lock()
defer f.mu.Unlock()
f.qErr[name+"|"+dns.TypeToString[qtype]] = err
}
func (f *fakeChain) callCount(name string, qtype uint16) int {
f.mu.Lock()
defer f.mu.Unlock()

View File

@@ -0,0 +1,183 @@
package mgmt
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
"github.com/miekg/dns"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
"github.com/netbirdio/netbird/shared/management/domain"
)
// A domain already in the cache must not be re-resolved on a subsequent server
// domains update; it is left to the stale-while-revalidate refresh path.
func TestResolver_UpdateFromServerDomains_SkipsCached(t *testing.T) {
r := NewResolver()
chain := newFakeChain()
chain.setAnswer("signal.example.com.", dns.TypeA, "10.0.0.2")
r.SetChainResolver(chain, 50)
sd := dnsconfig.ServerDomains{Signal: domain.Domain("signal.example.com")}
_, err := r.UpdateFromServerDomains(context.Background(), sd)
require.NoError(t, err)
require.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
"first update must resolve the domain")
_, err = r.UpdateFromServerDomains(context.Background(), sd)
require.NoError(t, err)
assert.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
"cached domain must not be re-resolved on a subsequent update")
}
// New domains in a single update must resolve concurrently rather than serially.
func TestResolver_AddNewDomains_ResolvesConcurrently(t *testing.T) {
r := NewResolver()
chain := newFakeChain()
var inflight, maxInflight atomic.Int32
chain.onLookup = func() {
n := inflight.Add(1)
for {
old := maxInflight.Load()
if n <= old || maxInflight.CompareAndSwap(old, n) {
break
}
}
time.Sleep(50 * time.Millisecond)
inflight.Add(-1)
}
relays := []domain.Domain{"a.example.com", "b.example.com", "c.example.com", "d.example.com"}
for _, d := range relays {
chain.setAnswer(dns.Fqdn(string(d)), dns.TypeA, "10.0.0.2")
}
r.SetChainResolver(chain, 50)
start := time.Now()
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: relays})
require.NoError(t, err)
elapsed := time.Since(start)
assert.GreaterOrEqual(t, int(maxInflight.Load()), 2, "domains must resolve concurrently")
// Serial resolution of 4 domains would take at least 4*50ms; concurrent is far less.
assert.Less(t, elapsed, 300*time.Millisecond, "resolution should not be serial")
}
// A domain that fails to resolve must not be retried on every update; the
// failure backoff suppresses re-resolution until it expires.
func TestResolver_UpdateFromServerDomains_BacksOffFailures(t *testing.T) {
r := NewResolver()
chain := newFakeChain()
chain.err = errors.New("resolve boom")
r.SetChainResolver(chain, 50)
sd := dnsconfig.ServerDomains{Signal: domain.Domain("signal.example.com")}
_, err := r.UpdateFromServerDomains(context.Background(), sd)
require.NoError(t, err)
require.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
"first update must attempt the resolve")
_, err = r.UpdateFromServerDomains(context.Background(), sd)
require.NoError(t, err)
assert.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
"failed resolve must back off and not retry on the next update")
}
// A domain listed under more than one server-domain type (e.g. STUN and TURN on
// the same host) must be resolved once per update, not once per occurrence.
func TestResolver_AddNewDomains_DedupesDuplicateDomains(t *testing.T) {
r := NewResolver()
chain := newFakeChain()
chain.setAnswer("dup.example.com.", dns.TypeA, "10.0.0.9")
r.SetChainResolver(chain, 50)
sd := dnsconfig.ServerDomains{
Stuns: []domain.Domain{"dup.example.com"},
Turns: []domain.Domain{"dup.example.com"},
}
_, err := r.UpdateFromServerDomains(context.Background(), sd)
require.NoError(t, err)
assert.Equal(t, 1, chain.callCount("dup.example.com.", dns.TypeA),
"a domain appearing under multiple server-domain types must resolve once")
}
// A failure marker must be dropped once its domain leaves the server-domains set
// so the map stays bounded to the current set.
func TestResolver_UpdateFromServerDomains_PrunesFailedResolves(t *testing.T) {
r := NewResolver()
chain := newFakeChain()
chain.err = errors.New("resolve boom")
r.SetChainResolver(chain, 50)
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Signal: domain.Domain("gone.example.com")})
require.NoError(t, err)
r.mutex.RLock()
_, marked := r.failedResolves[domain.Domain("gone.example.com")]
r.mutex.RUnlock()
require.True(t, marked, "failed resolve must be recorded")
_, err = r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Signal: domain.Domain("other.example.com")})
require.NoError(t, err)
r.mutex.RLock()
_, stillMarked := r.failedResolves[domain.Domain("gone.example.com")]
r.mutex.RUnlock()
assert.False(t, stillMarked, "failure marker for a domain no longer in the set must be pruned")
}
// When one family hard-errors while the other resolves, the domain is cached
// for the working family but recorded as incomplete so the failed family is
// retried under backoff instead of being treated as fully resolved forever.
func TestResolver_AddNewDomains_RetriesPartialFamilyFailure(t *testing.T) {
d := domain.Domain("relay.example.com")
r := NewResolver()
chain := newFakeChain()
chain.setAnswer("relay.example.com.", dns.TypeA, "10.0.0.2")
chain.setErr("relay.example.com.", dns.TypeAAAA, errors.New("servfail"))
r.SetChainResolver(chain, 50)
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: []domain.Domain{d}})
require.NoError(t, err)
r.mutex.RLock()
_, aCached := r.records[dns.Question{Name: "relay.example.com.", Qtype: dns.TypeA, Qclass: dns.ClassINET}]
_, marked := r.failedResolves[d]
r.mutex.RUnlock()
require.True(t, aCached, "the working family must still be cached")
require.True(t, marked, "a partial failure must be recorded so the failed family is retried")
assert.False(t, r.needsResolve(d), "within the backoff window the domain is not retried")
r.mutex.Lock()
r.failedResolves[d] = time.Now().Add(-2 * refreshBackoff)
r.mutex.Unlock()
assert.True(t, r.needsResolve(d), "after the backoff elapses the domain is retried to pick up the missing family")
}
// A family that returns NODATA (legitimately absent, e.g. an IPv4-only host) is
// not a failure: the domain must not be marked for retry, otherwise it would be
// re-resolved on every sync.
func TestResolver_AddNewDomains_NodataIsNotFailure(t *testing.T) {
d := domain.Domain("v4only.example.com")
r := NewResolver()
chain := newFakeChain()
chain.setAnswer("v4only.example.com.", dns.TypeA, "10.0.0.2")
r.SetChainResolver(chain, 50)
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: []domain.Domain{d}})
require.NoError(t, err)
r.mutex.RLock()
_, marked := r.failedResolves[d]
r.mutex.RUnlock()
assert.False(t, marked, "a NODATA family must not be recorded as a failure")
assert.False(t, r.needsResolve(d), "an IPv4-only host must not be re-resolved on later syncs")
}

View File

@@ -41,7 +41,7 @@ func TestAffectedPeers_DependencyCoverageMatrix(t *testing.T) {
_, err := s.manager.SavePolicy(ctx, s.accountID, userID, peerToResourcePolicyByGroup(s.sourceGroupID, s.resourceGroupID), true)
require.NoError(t, err)
return affectedpeers.Change{ChangedPeerIDs: []string{s.routerPeerID}},
[]string{s.sourcePeerID, s.routerPeerID}, []string{s.unrelatedPeerID}
[]string{s.sourcePeerID}, []string{s.unrelatedPeerID}
},
},
{
@@ -106,8 +106,12 @@ func TestAffectedPeers_DependencyCoverageMatrix(t *testing.T) {
change, mustContain, mustExclude := r.build(t, s, ctx)
affected := resolveAffected(t, s.manager.Store, s.accountID, change)
assert.ElementsMatch(t, affected, mustContain, "expected peer to be affected")
assert.NotContains(t, affected, mustExclude, "peer must not be affected")
for _, id := range mustContain {
assert.Contains(t, affected, id, "expected peer to be affected")
}
for _, id := range mustExclude {
assert.NotContains(t, affected, id, "peer must not be affected")
}
})
}
}

View File

@@ -96,54 +96,33 @@ func affectedGroupID(i int) string { return fmt.Sprintf("affected-grp-%d", i)
func affectedGroupName(i int) string { return fmt.Sprintf("AffectedGroup%d", i) }
func TestCollectGroupChange_PolicyLinked(t *testing.T) {
manager, s, accountID, peerIDs, groupIDs := setupAffectedPeersTest(t)
manager, s, accountID, _, groupIDs := setupAffectedPeersTest(t)
ctx := context.Background()
_, err := manager.SavePolicy(ctx, accountID, userID, &types.Policy{
Enabled: true,
Rules: []*types.PolicyRule{
{
Enabled: true,
Sources: []string{groupIDs[0]},
Destinations: []string{groupIDs[1]},
SourceResource: types.Resource{ID: peerIDs[0], Type: types.ResourceTypePeer},
DestinationResource: types.Resource{ID: peerIDs[1], Type: types.ResourceTypePeer},
Bidirectional: true,
Action: types.PolicyTrafficActionAccept,
},
{
Enabled: true,
Sources: []string{groupIDs[0]},
Destinations: []string{groupIDs[1]},
SourceResource: types.Resource{ID: peerIDs[2], Type: types.ResourceTypeHost},
DestinationResource: types.Resource{ID: peerIDs[3], Type: types.ResourceTypeHost},
Bidirectional: true,
Action: types.PolicyTrafficActionAccept,
},
{
Enabled: true,
Sources: []string{groupIDs[0]},
Destinations: []string{groupIDs[1]},
SourceResource: types.Resource{ID: "", Type: types.ResourceTypePeer},
DestinationResource: types.Resource{ID: "", Type: types.ResourceTypePeer},
Bidirectional: true,
Action: types.PolicyTrafficActionAccept,
Enabled: true,
Sources: []string{groupIDs[0]},
Destinations: []string{groupIDs[1]},
Bidirectional: true,
Action: types.PolicyTrafficActionAccept,
},
},
}, true)
require.NoError(t, err)
groups, directPeers := collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[0]})
assert.ElementsMatch(t, groups, []string{groupIDs[0], groupIDs[1]})
assert.ElementsMatch(t, directPeers, []string{peerIDs[1]})
groups, _ := collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[0]})
assert.Contains(t, groups, groupIDs[0])
assert.Contains(t, groups, groupIDs[1])
groups, directPeers = collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[1]})
assert.ElementsMatch(t, groups, []string{groupIDs[0], groupIDs[1]})
assert.ElementsMatch(t, directPeers, []string{peerIDs[0]})
groups, _ = collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[1]})
assert.Contains(t, groups, groupIDs[0])
assert.Contains(t, groups, groupIDs[1])
groups, directPeers = collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[2]})
groups, _ = collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[2]})
assert.Empty(t, groups)
assert.Empty(t, directPeers)
}
func TestCollectGroupChange_PolicyWithDirectPeerResource(t *testing.T) {
@@ -154,44 +133,20 @@ func TestCollectGroupChange_PolicyWithDirectPeerResource(t *testing.T) {
Enabled: true,
Rules: []*types.PolicyRule{
{
Enabled: true,
Sources: []string{groupIDs[0]},
SourceResource: types.Resource{ID: peerIDs[3], Type: types.ResourceTypePeer},
DestinationResource: types.Resource{ID: peerIDs[4], Type: types.ResourceTypePeer},
Destinations: []string{groupIDs[1]},
Action: types.PolicyTrafficActionAccept,
},
{
Enabled: true,
Sources: []string{groupIDs[0]},
SourceResource: types.Resource{ID: peerIDs[1], Type: types.ResourceTypeHost},
DestinationResource: types.Resource{ID: peerIDs[2], Type: types.ResourceTypeHost},
Destinations: []string{groupIDs[1]},
Action: types.PolicyTrafficActionAccept,
},
{
Enabled: true,
Sources: []string{groupIDs[0]},
SourceResource: types.Resource{ID: "", Type: types.ResourceTypePeer},
DestinationResource: types.Resource{ID: "", Type: types.ResourceTypePeer},
Destinations: []string{groupIDs[1]},
Action: types.PolicyTrafficActionAccept,
Enabled: true,
Sources: []string{groupIDs[0]},
SourceResource: types.Resource{ID: peerIDs[3], Type: types.ResourceTypePeer},
Destinations: []string{groupIDs[1]},
Action: types.PolicyTrafficActionAccept,
},
},
}, true)
require.NoError(t, err)
groups, directPeers := collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[0]})
assert.ElementsMatch(t, groups, []string{groupIDs[0], groupIDs[1]})
assert.ElementsMatch(t, directPeers, []string{peerIDs[4]})
groups, directPeers = collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[1]})
assert.ElementsMatch(t, groups, []string{groupIDs[0], groupIDs[1]})
assert.ElementsMatch(t, directPeers, []string{peerIDs[3]})
groups, directPeers = collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[2]})
assert.Empty(t, groups)
assert.Empty(t, directPeers)
assert.Contains(t, groups, groupIDs[0])
assert.Contains(t, groups, groupIDs[1])
assert.Contains(t, directPeers, peerIDs[3])
}
func TestCollectGroupChange_PolicyWithNonPeerResource_NoDirectPeers(t *testing.T) {
@@ -213,7 +168,8 @@ func TestCollectGroupChange_PolicyWithNonPeerResource_NoDirectPeers(t *testing.T
require.NoError(t, err)
groups, directPeers := collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[0]})
assert.ElementsMatch(t, groups, []string{groupIDs[0], groupIDs[1]})
assert.Contains(t, groups, groupIDs[0])
assert.Contains(t, groups, groupIDs[1])
assert.Empty(t, directPeers, "non-peer resources should not produce direct peer IDs")
}
@@ -417,11 +373,17 @@ func TestCollectGroupChange_MultipleEntities(t *testing.T) {
require.NoError(t, err)
groups, directPeers := collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[0]})
assert.ElementsMatch(t, groups, []string{groupIDs[0], groupIDs[1]})
assert.Contains(t, groups, groupIDs[0])
assert.Contains(t, groups, groupIDs[1])
assert.NotContains(t, groups, groupIDs[2])
assert.NotContains(t, groups, groupIDs[3])
assert.Empty(t, directPeers)
groups, directPeers = collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[3]})
assert.ElementsMatch(t, groups, []string{groupIDs[2], groupIDs[3]})
assert.Contains(t, groups, groupIDs[2])
assert.Contains(t, groups, groupIDs[3])
assert.NotContains(t, groups, groupIDs[0])
assert.NotContains(t, groups, groupIDs[1])
assert.Empty(t, directPeers)
}
@@ -490,9 +452,8 @@ func TestResolveAffectedPeers_PolicyBetweenTwoGroups(t *testing.T) {
result = manager.resolveAffectedPeersForPeerChanges(ctx, s, accountID, []string{peerIDs[1]})
assert.ElementsMatch(t, []string{peerIDs[0], peerIDs[1]}, result)
// peerIDs[2] is unrelated to the route; only its own map can change.
result = manager.resolveAffectedPeersForPeerChanges(ctx, s, accountID, []string{peerIDs[2]})
assert.ElementsMatch(t, []string{peerIDs[2]}, result)
assert.Empty(t, result)
}
func TestResolveAffectedPeers_PolicyThreeGroups(t *testing.T) {
@@ -513,7 +474,7 @@ func TestResolveAffectedPeers_PolicyThreeGroups(t *testing.T) {
require.NoError(t, err)
result := manager.resolveAffectedPeersForPeerChanges(ctx, s, accountID, []string{peerIDs[0]})
assert.ElementsMatch(t, []string{peerIDs[0], peerIDs[2]}, result)
assert.ElementsMatch(t, []string{peerIDs[0], peerIDs[1], peerIDs[2]}, result)
}
func TestResolveAffectedPeers_RoutePeerGroups(t *testing.T) {
@@ -545,9 +506,8 @@ func TestResolveAffectedPeers_RoutePeerGroups(t *testing.T) {
result = manager.resolveAffectedPeersForPeerChanges(ctx, s, accountID, []string{peerIDs[1]})
assert.ElementsMatch(t, []string{peerIDs[0], peerIDs[1]}, result)
// peerIDs[2] is in no policy; only its own map can change, so it refreshes itself.
result = manager.resolveAffectedPeersForPeerChanges(ctx, s, accountID, []string{peerIDs[2]})
assert.ElementsMatch(t, []string{peerIDs[2]}, result)
assert.Empty(t, result)
}
func TestResolveAffectedPeers_RouteWithDirectPeer(t *testing.T) {
@@ -604,9 +564,9 @@ func TestResolveAffectedPeers_RouteWithAccessControlGroups(t *testing.T) {
result := manager.resolveAffectedPeersForPeerChanges(ctx, s, accountID, []string{peerIDs[2]})
assert.ElementsMatch(t, []string{peerIDs[0], peerIDs[1], peerIDs[2]}, result)
// peer3 is unrelated to the route; only its own map can change.
// peer3 is unrelated
result = manager.resolveAffectedPeersForPeerChanges(ctx, s, accountID, []string{peerIDs[3]})
assert.ElementsMatch(t, []string{peerIDs[3]}, result)
assert.Empty(t, result)
}
func TestResolveAffectedPeers_NetworkRouter(t *testing.T) {
@@ -699,13 +659,9 @@ func TestResolveAffectedPeers_PeerInMultipleGroups(t *testing.T) {
}, true)
require.NoError(t, err)
// peer0 is in group0 AND group1, so both policies apply. A peer change folds
// only the changed peer plus the opposite side of each rule: group2 (peer2) via
// the group0 policy and group3 (peer3) via the group1 policy. peer1, a co-member
// of group1, is a sibling of the changed peer and must NOT refresh.
// peer0 is in group0 AND group1, so both policies apply
result := manager.resolveAffectedPeersForPeerChanges(ctx, s, accountID, []string{peerIDs[0]})
assert.ElementsMatch(t, []string{peerIDs[0], peerIDs[2], peerIDs[3]}, result)
assert.NotContains(t, result, peerIDs[1], "co-member of the changed peer's group must not refresh")
assert.ElementsMatch(t, []string{peerIDs[0], peerIDs[1], peerIDs[2], peerIDs[3]}, result)
}
func TestResolveAffectedPeers_MultipleChangedPeers(t *testing.T) {
@@ -741,7 +697,7 @@ func TestResolveAffectedPeers_MultipleChangedPeers(t *testing.T) {
require.NoError(t, err)
result := manager.resolveAffectedPeersForPeerChanges(ctx, s, accountID, []string{peerIDs[0], peerIDs[2]})
assert.ElementsMatch(t, []string{peerIDs[0], peerIDs[2], peerIDs[1], peerIDs[3]}, result)
assert.ElementsMatch(t, []string{peerIDs[0], peerIDs[1], peerIDs[2], peerIDs[3]}, result)
}
func TestResolveAffectedPeers_SharedGroupAcrossPolicyAndRoute(t *testing.T) {
@@ -898,9 +854,8 @@ func TestAffectedPeers_IsolatedPolicies(t *testing.T) {
assert.NotContains(t, result, peerIDs[0])
assert.NotContains(t, result, peerIDs[1])
// peerIDs[4] is in neither isolated policy; only its own map can change.
result = manager.resolveAffectedPeersForPeerChanges(ctx, s, accountID, []string{peerIDs[4]})
assert.ElementsMatch(t, []string{peerIDs[4]}, result)
assert.Empty(t, result)
}
func TestAffectedPeers_IsolatedRouteAndPolicy(t *testing.T) {
@@ -1022,13 +977,12 @@ func TestAffectedPeers_GroupUpdateOnlyAffectsLinkedPeers(t *testing.T) {
})
}
// A peer in no policy/route refreshes only itself — no other peer is affected.
func TestAffectedPeers_UnlinkedPeerChange_RefreshesSelfOnly(t *testing.T) {
func TestAffectedPeers_UnlinkedGroupChange_NoUpdates(t *testing.T) {
manager, s, accountID, peerIDs, _ := setupAffectedPeersTest(t)
ctx := context.Background()
result := manager.resolveAffectedPeersForPeerChanges(ctx, s, accountID, []string{peerIDs[0]})
assert.ElementsMatch(t, []string{peerIDs[0]}, result)
assert.Empty(t, result)
}
// TestAffectedPeers_PolicyChange_UnrelatedPeerNoUpdate verifies that creating/deleting a

View File

@@ -61,8 +61,7 @@ func Load(ctx context.Context, s store.Store, accountID string, c Change) (*Snap
// loadCollections reads the policy/route/nameserver/dns/router/resource/proxy
// collections a Change can touch, gated to what the walk needs.
func (snap *Snapshot) loadCollections(ctx context.Context, s store.Store, accountID string, c Change) error {
// LinkGroups drive the same policy/route/dns walk as a changed group or peer.
hasGroupOrPeerChange := len(c.ChangedGroupIDs) > 0 || len(c.ChangedPeerIDs) > 0 || len(c.LinkGroups) > 0 || len(c.Resources) > 0
hasGroupOrPeerChange := len(c.ChangedGroupIDs) > 0 || len(c.ChangedPeerIDs) > 0 || len(c.Resources) > 0
hasNetworkObject := len(c.Routers) > 0 || len(c.Resources) > 0 || len(c.Networks) > 0
// the resource<->router bridge can fire for any of these
needsRoutersResources := hasGroupOrPeerChange || len(c.PostureCheckIDs) > 0 || len(c.Policies) > 0 || hasNetworkObject
@@ -77,7 +76,7 @@ func (snap *Snapshot) loadCollections(ctx context.Context, s store.Store, accoun
return err
}
}
if len(c.ChangedGroupIDs) > 0 || len(c.ChangedPeerIDs) > 0 || len(c.LinkGroups) > 0 {
if len(c.ChangedGroupIDs) > 0 || len(c.ChangedPeerIDs) > 0 {
if err := snap.loadDNS(ctx, s, accountID); err != nil {
return err
}
@@ -175,24 +174,6 @@ type Change struct {
// folded in — but only when the group is linked (an unlinked group has no map
// impact), matching how current members are handled.
RemovedPeersByGroup map[string][]string
// OutputPeerIDs are peers folded straight into the result without seeding their
// group memberships into the walk. Use for the peer whose group membership changed:
// the peer itself must refresh, but its OTHER groups did not change, so they must
// not be walked. Contrast ChangedPeerIDs, which seeds ALL of the peer's groups
// (correct when the peer's own attributes changed, e.g. IP/status).
OutputPeerIDs []string
// LinkGroups are groups used ONLY to match policies/routes/routers and walk to the
// OPPOSITE side — they are never expanded to their own members. Use this when a
// peer's group membership changed: pass the peer in ChangedPeerIDs and its
// group(s) here. The opposite side of the policies the group participates in
// refreshes, but the group's other members (siblings) do not — nothing changed for
// them. For an intra-group policy (A→A) the opposite side IS the group, so its
// members still refresh via the opposite-side fold, exactly when they genuinely
// gain/lose the changed peer. Unlike ChangedGroupIDs, a LinkGroup is not added to
// the output, so a one-sided membership change never wakes the whole group.
LinkGroups []string
}
func (c Change) isEmpty() bool {
@@ -205,9 +186,7 @@ func (c Change) isEmpty() bool {
len(c.Networks) == 0 &&
len(c.PostureCheckIDs) == 0 &&
len(c.DistributionGroupIDs) == 0 &&
len(c.RemovedPeersByGroup) == 0 &&
len(c.LinkGroups) == 0 &&
len(c.OutputPeerIDs) == 0
len(c.RemovedPeersByGroup) == 0
}
// Expand returns the deduplicated affected peer IDs from the preloaded Snapshot,
@@ -218,8 +197,8 @@ func (snap *Snapshot) Expand(ctx context.Context, accountID string, c Change) []
return nil
}
r := newResolver(ctx, snap, accountID, c)
log.WithContext(ctx).Tracef("affectedpeers expand start: account=%s changedGroups=%v changedPeers=%v linkGroups=%v policies=%d routes=%d routers=%d resources=%d networks=%d postureChecks=%v distributionGroups=%v",
accountID, c.ChangedGroupIDs, c.ChangedPeerIDs, c.LinkGroups, len(c.Policies), len(c.Routes), len(c.Routers), len(c.Resources), len(c.Networks), c.PostureCheckIDs, c.DistributionGroupIDs)
log.WithContext(ctx).Tracef("affectedpeers expand start: account=%s changedGroups=%v changedPeers=%v policies=%d routes=%d routers=%d resources=%d networks=%d postureChecks=%v distributionGroups=%v",
accountID, c.ChangedGroupIDs, c.ChangedPeerIDs, len(c.Policies), len(c.Routes), len(c.Routers), len(c.Resources), len(c.Networks), c.PostureCheckIDs, c.DistributionGroupIDs)
r.walk()
return r.expand()
}
@@ -237,84 +216,57 @@ func Collect(ctx context.Context, s store.Store, accountID string, c Change) (gr
}
r := newResolver(ctx, snap, accountID, c)
r.walk()
return setToSlice(r.affectedGroups), setToSlice(r.affectedPeers)
return setToSlice(r.groupSet), setToSlice(r.peerSet)
}
func newResolver(ctx context.Context, snap *Snapshot, accountID string, c Change) *resolver {
r := &resolver{
ctx: ctx,
snap: snap,
accountID: accountID,
change: c,
linkGroups: toSet(c.ChangedGroupIDs),
outputGroups: toSet(c.ChangedGroupIDs),
changedPeers: toSet(c.ChangedPeerIDs),
affectedGroups: make(map[string]struct{}),
affectedPeers: make(map[string]struct{}),
ctx: ctx,
snap: snap,
accountID: accountID,
change: c,
changedGroupSet: toSet(c.ChangedGroupIDs),
changedPeerSet: toSet(c.ChangedPeerIDs),
groupSet: make(map[string]struct{}),
peerSet: make(map[string]struct{}),
networkIDs: make(map[string]struct{}),
}
// LinkGroups match policies/routes to find the opposite side but are NOT output:
// they go into linkGroups only, never outputGroups, so their members never fold in.
addAll(r.linkGroups, c.LinkGroups)
// Resolve each changed peer to its groups here so callers pass only ChangedPeerIDs.
r.seedChangedGroupsFromPeers()
r.matchedPolicies = append(r.matchedPolicies, c.Policies...)
return r
}
// seedChangedGroupsFromPeers adds each changed peer's groups to linkGroups so
// seedChangedGroupsFromPeers adds each changed peer's groups to changedGroupSet so
// the group-driven walkers fire for memberships, not just direct peer references.
// These seeded groups are for MATCHING only — folding the changed entity's own
// side is gated on outputGroups (the caller-reported groups), so a seeded group
// never folds its whole membership; only the changed peer itself folds in.
func (r *resolver) seedChangedGroupsFromPeers() {
if len(r.changedPeers) == 0 {
if len(r.changedPeerSet) == 0 {
return
}
for groupID, members := range r.snap.groupPeers {
for pID := range r.changedPeers {
for pID := range r.changedPeerSet {
if _, ok := members[pID]; ok {
r.linkGroups[groupID] = struct{}{}
r.changedGroupSet[groupID] = struct{}{}
break
}
}
}
}
// policySide selects which side of a policy rule to walk.
type policySide int
const (
sideSource policySide = iota
sideDestination
)
func (s policySide) opposite() policySide {
if s == sideSource {
return sideDestination
}
return sideSource
}
// walk resolves affected peers in two buckets, by how far each change propagates.
//
// BOTH-SIDES — the rule itself changed (an explicit policy edit, or a policy whose
// posture check changed). Source AND destination refresh, so each such policy is
// walked on both sides.
//
// OPPOSITE-SIDE — an endpoint moved but no rule changed. For each policy the change
// touches we fold only the side AWAY from the change:
// - a changed peer/group sits ON a policy side -> fold the opposite side;
// - a changed router/resource/network sits on a NETWORK -> fold the SOURCE side of
// the policies whose destination reaches it (and the routers it implies).
//
// Routes, nameserver groups, DNS and embedded-proxy services distribute to their own
// member peers, outside the policy graph, and are folded here too.
func (r *resolver) walk() {
for _, policy := range r.bothSidesPolicies() {
r.foldPolicySide(policy, sideSource)
r.foldPolicySide(policy, sideDestination)
}
r.collectFromExplicitPolicies()
r.collectFromExplicitRoutes(r.change.Routes)
r.collectFromExplicitRouters(r.change.Routers)
r.collectFromExplicitResources(r.change.Resources)
r.collectFromExplicitNetworks(r.change.Networks)
r.collectFromPostureChecks(r.change.PostureCheckIDs)
if len(r.linkGroups) > 0 || len(r.changedPeers) > 0 {
// Distribution groups (nameserver/DNS) affect only their member peers: fold them
// straight into groupSet so expand() maps them to members, without the policy/
// route walk that changedGroupSet would trigger.
addAll(r.groupSet, r.change.DistributionGroupIDs)
if len(r.changedGroupSet) > 0 || len(r.changedPeerSet) > 0 {
r.collectFromPolicies()
r.collectFromRoutes()
r.collectFromNameServers()
@@ -323,31 +275,7 @@ func (r *resolver) walk() {
r.collectFromProxyServices()
}
r.collectFromChangedRoutes(r.change.Routes)
r.collectFromChangedRouters(r.change.Routers)
r.collectFromChangedResources(r.change.Resources)
r.collectFromChangedNetworks(r.change.Networks)
// The explicitly changed peers always refresh their own maps. OnPeersUpdated only
// refreshes the resolver's output (it ignores the separately-passed changed peers),
// so the changed peer reaches its own new map only via here. An offline/deleted
// peer in the set is filtered downstream (filterConnectedAffectedPeers).
addAll(r.affectedPeers, setToSlice(r.changedPeers))
// OutputPeerIDs refresh themselves too, but unlike changedPeers their group
// memberships were not seeded into the walk (only the changed group was).
addAll(r.affectedPeers, r.change.OutputPeerIDs)
// Distribution groups (nameserver/DNS) affect only their member peers: fold them
// straight into affectedGroups so expand() maps them to members, without the
// policy/route walk that linkGroups would trigger.
addAll(r.affectedGroups, r.change.DistributionGroupIDs)
}
// bothSidesPolicies are the policies whose rule changed: the explicitly edited ones
// plus those gated by a changed posture check. walk folds both their sides.
func (r *resolver) bothSidesPolicies() []*types.Policy {
policies := append([]*types.Policy(nil), r.change.Policies...)
return r.appendPoliciesForPostureChecks(policies, r.change.PostureCheckIDs)
r.collectResourceRouterBridge()
}
type resolver struct {
@@ -356,25 +284,14 @@ type resolver struct {
accountID string
change Change
// Inputs — what changed. Set once at construction, read-only during the walk
// (except linkGroups, which collectFromExplicitResources also seeds).
//
// linkGroups is the MATCH set: caller-changed groups the groups of changed
// peers changed-resource groups. A rule/route/router matches the change when
// one of its groups is here — used only to find the opposite side to fold.
//
// outputGroups is the FOLD-WHOLE-GROUP set: ONLY Change.ChangedGroupIDs. When a
// matched group is here, its whole membership is affected. A peer-seeded group
// is in linkGroups but NOT outputGroups, so it folds only the changed peer
// (changedPeers), never its siblings.
linkGroups map[string]struct{}
outputGroups map[string]struct{}
changedPeers map[string]struct{}
changedGroupSet map[string]struct{}
changedPeerSet map[string]struct{}
// Outputs — the answer. The only sets the walk accumulates into. affectedGroups
// is expanded to its member peers in expand().
affectedGroups map[string]struct{}
affectedPeers map[string]struct{}
groupSet map[string]struct{}
peerSet map[string]struct{}
matchedPolicies []*types.Policy
networkIDs map[string]struct{}
}
func (r *resolver) policies() []*types.Policy { return r.snap.policies }
@@ -384,10 +301,10 @@ func (r *resolver) networkResources() []*resourceTypes.NetworkResource { return
func (r *resolver) networkRouters() []*routerTypes.NetworkRouter { return r.snap.routers }
// peerIDsForGroups maps a group set to its member peer IDs via the preloaded index.
func (r *resolver) peerIDsForGroups(groups map[string]struct{}) []string {
func (r *resolver) peerIDsForGroups(groupSet map[string]struct{}) []string {
seen := make(map[string]struct{})
var ids []string
for gID := range groups {
for gID := range groupSet {
for pID := range r.snap.groupPeers[gID] {
if _, ok := seen[pID]; ok {
continue
@@ -400,25 +317,25 @@ func (r *resolver) peerIDsForGroups(groups map[string]struct{}) []string {
}
func (r *resolver) expand() []string {
peerIDs := r.peerIDsForGroups(r.affectedGroups)
peerIDs := r.peerIDsForGroups(r.groupSet)
log.WithContext(r.ctx).Tracef("affectedpeers expand: account=%s affectedGroups=%v -> %d group-member peers; direct peers=%v",
r.accountID, setToSlice(r.affectedGroups), len(peerIDs), setToSlice(r.affectedPeers))
r.accountID, setToSlice(r.groupSet), len(peerIDs), setToSlice(r.peerSet))
seen := make(map[string]struct{}, len(peerIDs))
for _, id := range peerIDs {
seen[id] = struct{}{}
}
for id := range r.affectedPeers {
for id := range r.peerSet {
if _, ok := seen[id]; !ok {
peerIDs = append(peerIDs, id)
seen[id] = struct{}{}
}
}
// Fold in removed peers only when their group is linked (in affectedGroups).
// Fold in removed peers only when their group is linked (in groupSet).
for groupID, removed := range r.change.RemovedPeersByGroup {
if _, linked := r.affectedGroups[groupID]; !linked {
if _, linked := r.groupSet[groupID]; !linked {
continue
}
for _, id := range removed {
@@ -434,300 +351,169 @@ func (r *resolver) expand() []string {
return peerIDs
}
// ruleSideGroups / ruleSideResource return the groups and the resource on the given
// side of a rule.
func ruleSideGroups(rule *types.PolicyRule, side policySide) []string {
if side == sideDestination {
return rule.Destinations
func (r *resolver) collectFromExplicitPolicies() {
for _, policy := range r.matchedPolicies {
if policy == nil {
continue
}
log.WithContext(r.ctx).Tracef("collectFromExplicitPolicies: changed policy %s (%s) -> folding rule groups %v + direct peers",
policy.ID, policy.Name, policy.RuleGroups())
addAll(r.groupSet, policy.RuleGroups())
collectPolicyDirectPeers(policy, r.peerSet)
}
return rule.Sources
}
func ruleSideResource(rule *types.PolicyRule, side policySide) types.Resource {
if side == sideDestination {
return rule.DestinationResource
}
return rule.SourceResource
}
// foldPolicySide folds one side of a policy down to affected peers: its groups
// (resolved to members in expand) and its direct peer. When the side is the
// DESTINATION and references a network resource (directly or via a destination
// group's resources), it also folds the routers that serve that resource's network
// — a destination resource is reached through its routers. A resource on the SOURCE
// side routes to nobody (GetPoliciesForNetworkResource matches destinations only),
// so the router hop is destination-only.
func (r *resolver) foldPolicySide(policy *types.Policy, side policySide) {
if policy == nil {
return
}
for _, rule := range policy.Rules {
addAll(r.affectedGroups, ruleSideGroups(rule, side))
res := ruleSideResource(rule, side)
if res.Type == types.ResourceTypePeer && res.ID != "" {
r.affectedPeers[res.ID] = struct{}{}
func (r *resolver) collectFromExplicitRoutes(routes []*route.Route) {
for _, rt := range routes {
if rt == nil {
continue
}
log.WithContext(r.ctx).Tracef("collectFromExplicitRoutes: changed route %s -> folding groups=%v peerGroups=%v accessControlGroups=%v peer=%q",
rt.ID, rt.Groups, rt.PeerGroups, rt.AccessControlGroups, rt.Peer)
addAll(r.groupSet, rt.Groups, rt.PeerGroups, rt.AccessControlGroups)
if rt.Peer != "" {
r.peerSet[rt.Peer] = struct{}{}
}
}
if side == sideDestination {
r.foldRoutersForResources(r.policyDestinationResourceIDs(policy))
}
// collectFromExplicitRouters folds changed routers' peers and marks their networks
// for the bridge. Passing the old router keeps a repointed router's previous peers
// affected without a post-commit read.
func (r *resolver) collectFromExplicitRouters(routers []*routerTypes.NetworkRouter) {
for _, router := range routers {
if router == nil {
continue
}
log.WithContext(r.ctx).Tracef("collectFromExplicitRouters: changed router %s on network %s -> folding peerGroups=%v peer=%q and marking network for source bridge",
router.ID, router.NetworkID, router.PeerGroups, router.Peer)
addAll(r.groupSet, router.PeerGroups)
if router.Peer != "" {
r.peerSet[router.Peer] = struct{}{}
}
if router.NetworkID != "" {
r.networkIDs[router.NetworkID] = struct{}{}
}
}
}
// appendPoliciesForPostureChecks appends every policy that references a changed
// posture check (a rule change, so walk both sides).
func (r *resolver) appendPoliciesForPostureChecks(policies []*types.Policy, postureCheckIDs []string) []*types.Policy {
// collectFromExplicitResources marks changed resources' networks for the bridge and
// treats their group IDs as changed, so policies targeting the resource via a
// now-detached (old) group still refresh.
func (r *resolver) collectFromExplicitResources(resources []*resourceTypes.NetworkResource) {
for _, resource := range resources {
if resource == nil {
continue
}
log.WithContext(r.ctx).Tracef("collectFromExplicitResources: changed resource %s on network %s -> marking network for bridge and treating groups %v as changed",
resource.ID, resource.NetworkID, resource.GroupIDs)
addAll(r.changedGroupSet, resource.GroupIDs)
if resource.NetworkID != "" {
r.networkIDs[resource.NetworkID] = struct{}{}
}
}
}
// collectFromExplicitNetworks marks changed networks for the bridge. A network has
// no groups/peers of its own.
func (r *resolver) collectFromExplicitNetworks(networks []*networkTypes.Network) {
for _, network := range networks {
if network == nil {
continue
}
log.WithContext(r.ctx).Tracef("collectFromExplicitNetworks: changed network %s -> marking for bridge", network.ID)
if network.ID != "" {
r.networkIDs[network.ID] = struct{}{}
}
}
}
func (r *resolver) collectFromPostureChecks(postureCheckIDs []string) {
if len(postureCheckIDs) == 0 {
return policies
return
}
ids := toSet(postureCheckIDs)
for _, policy := range r.policies() {
if !policyReferencesPostureChecks(policy, ids) {
continue
}
log.WithContext(r.ctx).Tracef("appendPoliciesForPostureChecks: policy %s (%s) references changed posture checks %v -> both-sides policy",
policy.ID, policy.Name, postureCheckIDs)
policies = append(policies, policy)
log.WithContext(r.ctx).Tracef("collectFromPostureChecks: policy %s (%s) references changed posture checks %v -> folding rule groups %v + direct peers",
policy.ID, policy.Name, postureCheckIDs, policy.RuleGroups())
addAll(r.groupSet, policy.RuleGroups())
collectPolicyDirectPeers(policy, r.peerSet)
r.matchedPolicies = append(r.matchedPolicies, policy)
}
return policies
}
// collectFromPolicies folds, for every policy whose rule a changed group or peer
// touches, only the OPPOSITE side (down to peers, incl. destination routers), plus
// the changed entity's own side: the changed group's whole membership when the
// group itself changed (outputGroups), or the changed peer alone when matched via a
// peer-seeded group (never its co-members).
func (r *resolver) collectFromPolicies() {
for _, policy := range r.policies() {
for _, rule := range policy.Rules {
r.foldRuleSideIfChanged(policy, rule, sideSource)
r.foldRuleSideIfChanged(policy, rule, sideDestination)
}
}
}
// foldRuleSideIfChanged: when a changed group or direct peer sits on `side` of the
// rule, fold the opposite side fully (groups/peers + destination routers) and fold
// the changed entity's own side (the whole changed group, or the changed peer alone).
func (r *resolver) foldRuleSideIfChanged(policy *types.Policy, rule *types.PolicyRule, side policySide) {
nearGroups := ruleSideGroups(rule, side)
nearResource := ruleSideResource(rule, side)
matchedByGroup := anyInSet(nearGroups, r.linkGroups)
matchedByPeer := isDirectPeerInSet(nearResource, r.changedPeers)
if !matchedByGroup && !matchedByPeer {
return
}
// Opposite side, fully down to peers (a destination opposite also folds routers).
r.foldPolicySideForRule(policy, rule, side.opposite())
// Own side: fold the whole changed group's members only when the group itself
// changed (outputGroups). A peer-seeded or link-only group is not folded here —
// its siblings never refresh. The changed peers themselves are folded once, after
// the walk (see walk()).
for _, gID := range nearGroups {
if _, ok := r.outputGroups[gID]; ok {
r.affectedGroups[gID] = struct{}{}
}
}
// When the changed side IS a destination, the resources it targets are reached
// through their network's routers, so those routers refresh too (e.g. attaching a
// resource to a destination group, or a changed destination group/resource).
if side == sideDestination {
r.foldRoutersForResources(r.ruleDestinationResourceIDs(rule))
}
}
// foldPolicySideForRule folds one side of a single rule (groups + direct peer), and
// for a destination side the routers of that rule's destination resources.
func (r *resolver) foldPolicySideForRule(policy *types.Policy, rule *types.PolicyRule, side policySide) {
addAll(r.affectedGroups, ruleSideGroups(rule, side))
res := ruleSideResource(rule, side)
if res.Type == types.ResourceTypePeer && res.ID != "" {
r.affectedPeers[res.ID] = struct{}{}
}
if side == sideDestination {
r.foldRoutersForResources(r.ruleDestinationResourceIDs(rule))
}
}
// collectFromChangedRoutes folds an explicitly changed route's own groups and peer.
func (r *resolver) collectFromChangedRoutes(routes []*route.Route) {
for _, rt := range routes {
if rt == nil {
matchedByGroup := policyReferencesGroups(policy, r.changedGroupSet)
matchedByPeer := len(r.changedPeerSet) > 0 && policyReferencesDirectPeers(policy, r.changedPeerSet)
if !matchedByGroup && !matchedByPeer {
continue
}
log.WithContext(r.ctx).Tracef("collectFromChangedRoutes: changed route %s -> folding groups=%v peerGroups=%v accessControlGroups=%v peer=%q",
rt.ID, rt.Groups, rt.PeerGroups, rt.AccessControlGroups, rt.Peer)
addAll(r.affectedGroups, rt.Groups, rt.PeerGroups, rt.AccessControlGroups)
if rt.Peer != "" {
r.affectedPeers[rt.Peer] = struct{}{}
}
}
}
// collectFromChangedRouters: a changed router refreshes its OWN backing peer/groups
// (the changed entity) and the SOURCE side of every policy reaching a resource on
// its network (the router serves the whole network). Sibling routers on the network
// are independent and are NOT folded. Passing the old router state keeps a repointed
// router's previous backing affected without a post-commit read.
func (r *resolver) collectFromChangedRouters(routers []*routerTypes.NetworkRouter) {
for _, router := range routers {
if router == nil {
continue
}
log.WithContext(r.ctx).Tracef("collectFromChangedRouters: changed router %s on network %s -> folding its own peerGroups=%v peer=%q + sources reaching network resources",
router.ID, router.NetworkID, router.PeerGroups, router.Peer)
addAll(r.affectedGroups, router.PeerGroups)
if router.Peer != "" {
r.affectedPeers[router.Peer] = struct{}{}
}
if router.NetworkID != "" {
r.foldPolicySourcesForResources(r.networkResourceIDs(router.NetworkID))
}
}
}
// collectFromChangedResources: a changed resource refreshes the SOURCE side of the
// policies targeting EXACTLY that resource — directly, or via one of the resource's
// own groups (oldnew across the change, so a now-detached group's sources still
// refresh) — plus the routers serving its network (the resource is reached through
// them). It does not touch sibling resources on the same network.
func (r *resolver) collectFromChangedResources(resources []*resourceTypes.NetworkResource) {
for _, resource := range resources {
if resource == nil {
continue
}
log.WithContext(r.ctx).Tracef("collectFromChangedResources: changed resource %s on network %s (groups %v) -> folding sources of policies targeting it + its network's routers",
resource.ID, resource.NetworkID, resource.GroupIDs)
r.foldPolicySourcesForResource(resource.ID, resource.GroupIDs)
if resource.NetworkID != "" {
r.foldRoutersOnNetworks(map[string]struct{}{resource.NetworkID: {}})
}
}
}
// foldPolicySourcesForResource folds the source side of every policy whose
// destination is the given resource — referenced directly, or via any of the given
// groups (the resource's own oldnew groups, which captures a detached group).
func (r *resolver) foldPolicySourcesForResource(resourceID string, groupIDs []string) {
groups := toSet(groupIDs)
for _, policy := range r.policies() {
if !policyTargetsResourceOrGroups(policy, resourceID, groups) {
continue
}
log.WithContext(r.ctx).Tracef("foldPolicySourcesForResource: policy %s (%s) targets changed resource %s -> folding its source groups/peers", policy.ID, policy.Name, resourceID)
collectPolicySources(policy, r.affectedGroups, r.affectedPeers)
}
}
// policyTargetsResourceOrGroups reports whether a policy's destination is the given
// resource directly, or one of the given destination groups.
func policyTargetsResourceOrGroups(policy *types.Policy, resourceID string, groups map[string]struct{}) bool {
if policy == nil {
return false
}
for _, rule := range policy.Rules {
if rule.DestinationResource.Type != types.ResourceTypePeer && rule.DestinationResource.ID == resourceID && resourceID != "" {
return true
}
if anyInSet(rule.Destinations, groups) {
return true
}
}
return false
}
// collectFromChangedNetworks: a changed network refreshes the SOURCE side of the
// policies reaching any of its resources, plus its routers. A network has no
// groups/peers of its own.
func (r *resolver) collectFromChangedNetworks(networks []*networkTypes.Network) {
for _, network := range networks {
if network == nil || network.ID == "" {
continue
}
log.WithContext(r.ctx).Tracef("collectFromChangedNetworks: changed network %s -> folding sources reaching its resources + its routers", network.ID)
resourceIDs := r.networkResourceIDs(network.ID)
r.foldPolicySourcesForResources(resourceIDs)
r.foldRoutersOnNetworks(map[string]struct{}{network.ID: {}})
}
}
// foldPolicySourcesForResources folds the source groups/peers of every policy whose
// destination targets one of resourceIDs (directly or via a destination group).
func (r *resolver) foldPolicySourcesForResources(resourceIDs map[string]struct{}) {
if len(resourceIDs) == 0 {
return
}
for _, policy := range r.policies() {
if r.policyTargetsResources(policy, resourceIDs) {
log.WithContext(r.ctx).Tracef("foldPolicySourcesForResources: policy %s (%s) targets a changed resource -> folding its source groups/peers", policy.ID, policy.Name)
collectPolicySources(policy, r.affectedGroups, r.affectedPeers)
}
log.WithContext(r.ctx).Tracef("collectFromPolicies: policy %s (%s) matched (byGroup=%t byPeer=%t) -> folding rule groups %v + direct peers",
policy.ID, policy.Name, matchedByGroup, matchedByPeer, policy.RuleGroups())
addAll(r.groupSet, policy.RuleGroups())
collectPolicyDirectPeers(policy, r.peerSet)
r.matchedPolicies = append(r.matchedPolicies, policy)
}
}
func (r *resolver) collectFromRoutes() {
for _, rt := range r.snap.routes {
matchedByGroup := anyInSet(rt.Groups, r.linkGroups) || anyInSet(rt.PeerGroups, r.linkGroups) || anyInSet(rt.AccessControlGroups, r.linkGroups)
matchedByPeer := rt.Peer != "" && len(r.changedPeers) > 0 && isInSet(rt.Peer, r.changedPeers)
matchedByGroup := anyInSet(rt.Groups, r.changedGroupSet) || anyInSet(rt.PeerGroups, r.changedGroupSet) || anyInSet(rt.AccessControlGroups, r.changedGroupSet)
matchedByPeer := rt.Peer != "" && len(r.changedPeerSet) > 0 && isInSet(rt.Peer, r.changedPeerSet)
if !matchedByGroup && !matchedByPeer {
continue
}
log.WithContext(r.ctx).Tracef("collectFromRoutes: route %s matched (byGroup=%t byPeer=%t) -> folding groups=%v peerGroups=%v accessControlGroups=%v peer=%q",
rt.ID, matchedByGroup, matchedByPeer, rt.Groups, rt.PeerGroups, rt.AccessControlGroups, rt.Peer)
addAll(r.affectedGroups, rt.Groups, rt.PeerGroups, rt.AccessControlGroups)
addAll(r.groupSet, rt.Groups, rt.PeerGroups, rt.AccessControlGroups)
if rt.Peer != "" {
r.affectedPeers[rt.Peer] = struct{}{}
r.peerSet[rt.Peer] = struct{}{}
}
}
}
func (r *resolver) collectFromNameServers() {
if len(r.linkGroups) == 0 {
if len(r.changedGroupSet) == 0 {
return
}
for _, ns := range r.snap.nsGroups {
if anyInSet(ns.Groups, r.linkGroups) {
if anyInSet(ns.Groups, r.changedGroupSet) {
log.WithContext(r.ctx).Tracef("collectFromNameServers: nameserver group %s references a changed group -> folding its groups %v", ns.ID, ns.Groups)
addAll(r.affectedGroups, ns.Groups)
addAll(r.groupSet, ns.Groups)
}
}
}
func (r *resolver) collectFromDNSSettings() {
if len(r.linkGroups) == 0 || r.snap.dnsSettings == nil {
if len(r.changedGroupSet) == 0 || r.snap.dnsSettings == nil {
return
}
for _, gID := range r.snap.dnsSettings.DisabledManagementGroups {
if _, ok := r.linkGroups[gID]; ok {
if _, ok := r.changedGroupSet[gID]; ok {
log.WithContext(r.ctx).Tracef("collectFromDNSSettings: changed group %s is in DisabledManagementGroups -> folding it", gID)
r.affectedGroups[gID] = struct{}{}
r.groupSet[gID] = struct{}{}
}
}
}
// collectFromNetworkRouters handles a changed group/peer that BACKS a router (the
// routing peer set moved): the router's own peers refresh and so do the sources of
// the policies reaching its network's resources. Sibling routers on the network are
// independent and are not folded.
func (r *resolver) collectFromNetworkRouters() {
for _, router := range r.networkRouters() {
matchedByGroup := anyInSet(router.PeerGroups, r.linkGroups)
matchedByPeer := router.Peer != "" && len(r.changedPeers) > 0 && isInSet(router.Peer, r.changedPeers)
matchedByGroup := anyInSet(router.PeerGroups, r.changedGroupSet)
matchedByPeer := router.Peer != "" && len(r.changedPeerSet) > 0 && isInSet(router.Peer, r.changedPeerSet)
if !matchedByGroup && !matchedByPeer {
continue
}
log.WithContext(r.ctx).Tracef("collectFromNetworkRouters: router %s on network %s matched (byGroup=%t byPeer=%t) -> folding its peerGroups=%v peer=%q + sources reaching network resources",
log.WithContext(r.ctx).Tracef("collectFromNetworkRouters: router %s on network %s matched (byGroup=%t byPeer=%t) -> folding peerGroups=%v peer=%q and marking network for source bridge",
router.ID, router.NetworkID, matchedByGroup, matchedByPeer, router.PeerGroups, router.Peer)
addAll(r.affectedGroups, router.PeerGroups)
addAll(r.groupSet, router.PeerGroups)
if router.Peer != "" {
r.affectedPeers[router.Peer] = struct{}{}
}
if router.NetworkID != "" {
r.foldPolicySourcesForResources(r.networkResourceIDs(router.NetworkID))
r.peerSet[router.Peer] = struct{}{}
}
r.networkIDs[router.NetworkID] = struct{}{}
}
}
@@ -748,34 +534,34 @@ func (r *resolver) collectFromProxyServices() {
continue
}
matchedByPeer := serviceMatchesChangedPeers(svc, proxyPeers, expanded)
matchedByAccessGroup := anyInSet(svc.AccessGroups, r.linkGroups)
matchedByAccessGroup := anyInSet(svc.AccessGroups, r.changedGroupSet)
if !matchedByPeer && !matchedByAccessGroup {
continue
}
log.WithContext(r.ctx).Tracef("collectFromProxyServices: service %s (cluster=%s) matched (byProxyOrTargetPeer=%t byAccessGroup=%t) -> folding %d proxy peers, peer targets and access groups %v",
svc.ID, svc.ProxyCluster, matchedByPeer, matchedByAccessGroup, len(proxyPeers), svc.AccessGroups)
for _, pid := range proxyPeers {
r.affectedPeers[pid] = struct{}{}
r.peerSet[pid] = struct{}{}
}
for _, target := range svc.Targets {
if target.TargetType == rpservice.TargetTypePeer && target.TargetId != "" {
r.affectedPeers[target.TargetId] = struct{}{}
r.peerSet[target.TargetId] = struct{}{}
}
}
addAll(r.affectedGroups, svc.AccessGroups)
addAll(r.groupSet, svc.AccessGroups)
}
}
func (r *resolver) expandChangedPeersWithGroups() map[string]struct{} {
if len(r.linkGroups) == 0 {
return r.changedPeers
if len(r.changedGroupSet) == 0 {
return r.changedPeerSet
}
ids := r.peerIDsForGroups(r.linkGroups)
ids := r.peerIDsForGroups(r.changedGroupSet)
if len(ids) == 0 {
return r.changedPeers
return r.changedPeerSet
}
merged := make(map[string]struct{}, len(r.changedPeers)+len(ids))
for id := range r.changedPeers {
merged := make(map[string]struct{}, len(r.changedPeerSet)+len(ids))
for id := range r.changedPeerSet {
merged[id] = struct{}{}
}
for _, id := range ids {
@@ -784,36 +570,54 @@ func (r *resolver) expandChangedPeersWithGroups() map[string]struct{} {
return merged
}
// foldRoutersForResources folds the routers serving the networks of the given
// resources (a destination resource is reached through its network's routers). It is
// the resource -> network -> router hop used by foldPolicySide for a destination.
func (r *resolver) foldRoutersForResources(resourceIDs map[string]struct{}) {
// collectResourceRouterBridge crosses between source peers and routing peers, which
// are reachable only via resource -> network -> router, not through the policy's own
// groups: source -> router (targeted resources' networks), then router -> source.
func (r *resolver) collectResourceRouterBridge() {
r.bridgeSourceToRouters()
r.bridgeRoutersToSources()
}
func (r *resolver) bridgeSourceToRouters() {
resourceIDs := r.policyDestinationResourceIDs(r.matchedPolicies...)
if len(resourceIDs) == 0 {
return
}
r.foldRoutersOnNetworks(r.resourceNetworkIDs(resourceIDs))
}
// ruleDestinationResourceIDs returns the destination resource IDs of a single rule:
// the direct DestinationResource plus the resources of its destination groups.
func (r *resolver) ruleDestinationResourceIDs(rule *types.PolicyRule) map[string]struct{} {
resourceIDs := make(map[string]struct{})
if rule.DestinationResource.Type != types.ResourceTypePeer && rule.DestinationResource.ID != "" {
resourceIDs[rule.DestinationResource.ID] = struct{}{}
networkIDs := r.resourceNetworkIDs(resourceIDs)
log.WithContext(r.ctx).Tracef("bridgeSourceToRouters: targeted resources %v -> networks %v (their routers become affected via the router->source pass)",
setToSlice(resourceIDs), setToSlice(networkIDs))
for id := range networkIDs {
r.networkIDs[id] = struct{}{}
}
r.addGroupResourceIDs(toSet(rule.Destinations), resourceIDs)
return resourceIDs
}
// networkResourceIDs returns the IDs of all resources on the given network.
func (r *resolver) networkResourceIDs(networkID string) map[string]struct{} {
func (r *resolver) bridgeRoutersToSources() {
if len(r.networkIDs) == 0 {
return
}
log.WithContext(r.ctx).Tracef("bridgeRoutersToSources: affected networks %v -> folding their routing peers and the source peers of policies targeting their resources",
setToSlice(r.networkIDs))
r.foldRoutersOnNetworks(r.networkIDs)
resourceIDs := make(map[string]struct{})
for _, resource := range r.networkResources() {
if resource.NetworkID == networkID {
if _, ok := r.networkIDs[resource.NetworkID]; ok {
resourceIDs[resource.ID] = struct{}{}
}
}
return resourceIDs
if len(resourceIDs) == 0 {
return
}
for _, policy := range r.policies() {
if r.policyTargetsResources(policy, resourceIDs) {
log.WithContext(r.ctx).Tracef("bridgeRoutersToSources: policy %s (%s) targets an affected-network resource -> folding its source groups/peers", policy.ID, policy.Name)
collectPolicySources(policy, r.groupSet, r.peerSet)
}
}
}
func (r *resolver) foldRoutersOnNetworks(networkIDs map[string]struct{}) {
@@ -823,9 +627,9 @@ func (r *resolver) foldRoutersOnNetworks(networkIDs map[string]struct{}) {
}
log.WithContext(r.ctx).Tracef("bridgeRoutersToSources: router %s serves affected network %s -> folding peerGroups=%v peer=%q",
router.ID, router.NetworkID, router.PeerGroups, router.Peer)
addAll(r.affectedGroups, router.PeerGroups)
addAll(r.groupSet, router.PeerGroups)
if router.Peer != "" {
r.affectedPeers[router.Peer] = struct{}{}
r.peerSet[router.Peer] = struct{}{}
}
}
}
@@ -910,26 +714,44 @@ func (r *resolver) addGroupResourceIDs(groupIDs map[string]struct{}, resourceIDs
}
}
func collectPolicyDirectPeers(policy *types.Policy, peers map[string]struct{}) {
func collectPolicyDirectPeers(policy *types.Policy, peerSet map[string]struct{}) {
for _, rule := range policy.Rules {
if rule.SourceResource.Type == types.ResourceTypePeer && rule.SourceResource.ID != "" {
peers[rule.SourceResource.ID] = struct{}{}
peerSet[rule.SourceResource.ID] = struct{}{}
}
if rule.DestinationResource.Type == types.ResourceTypePeer && rule.DestinationResource.ID != "" {
peers[rule.DestinationResource.ID] = struct{}{}
peerSet[rule.DestinationResource.ID] = struct{}{}
}
}
}
func collectPolicySources(policy *types.Policy, groups, peers map[string]struct{}) {
func collectPolicySources(policy *types.Policy, groupSet, peerSet map[string]struct{}) {
for _, rule := range policy.Rules {
addAll(groups, rule.Sources)
addAll(groupSet, rule.Sources)
if rule.SourceResource.Type == types.ResourceTypePeer && rule.SourceResource.ID != "" {
peers[rule.SourceResource.ID] = struct{}{}
peerSet[rule.SourceResource.ID] = struct{}{}
}
}
}
func policyReferencesGroups(policy *types.Policy, groupSet map[string]struct{}) bool {
for _, rule := range policy.Rules {
if anyInSet(rule.Sources, groupSet) || anyInSet(rule.Destinations, groupSet) {
return true
}
}
return false
}
func policyReferencesDirectPeers(policy *types.Policy, changedSet map[string]struct{}) bool {
for _, rule := range policy.Rules {
if isDirectPeerInSet(rule.SourceResource, changedSet) || isDirectPeerInSet(rule.DestinationResource, changedSet) {
return true
}
}
return false
}
func policyReferencesPostureChecks(policy *types.Policy, ids map[string]struct{}) bool {
for _, id := range policy.SourcePostureChecks {
if _, ok := ids[id]; ok {

View File

@@ -80,6 +80,26 @@ func TestChangeIsEmpty(t *testing.T) {
assert.False(t, Change{PostureCheckIDs: []string{"pc"}}.isEmpty())
}
func TestPolicyReferencesGroups(t *testing.T) {
policy := &types.Policy{Rules: []*types.PolicyRule{{Sources: []string{"g1", "g2"}, Destinations: []string{"g3"}}}}
assert.True(t, policyReferencesGroups(policy, map[string]struct{}{"g1": {}}))
assert.True(t, policyReferencesGroups(policy, map[string]struct{}{"g3": {}}))
assert.False(t, policyReferencesGroups(policy, map[string]struct{}{"g4": {}}))
assert.False(t, policyReferencesGroups(policy, map[string]struct{}{}))
}
func TestPolicyReferencesDirectPeers(t *testing.T) {
policy := &types.Policy{Rules: []*types.PolicyRule{{
SourceResource: types.Resource{Type: types.ResourceTypePeer, ID: "p1"},
DestinationResource: types.Resource{Type: types.ResourceTypeHost, ID: "r1"},
}}}
assert.True(t, policyReferencesDirectPeers(policy, map[string]struct{}{"p1": {}}))
assert.False(t, policyReferencesDirectPeers(policy, map[string]struct{}{"r1": {}}))
assert.False(t, policyReferencesDirectPeers(policy, map[string]struct{}{"p2": {}}))
}
func TestPolicyReferencesPostureChecks(t *testing.T) {
policy := &types.Policy{SourcePostureChecks: []string{"pc1", "pc2"}}

View File

@@ -520,12 +520,7 @@ func collectDeletableGroups(ctx context.Context, transaction store.Store, accoun
// GroupAddPeer appends peer to the group
func (am *DefaultAccountManager) GroupAddPeer(ctx context.Context, accountID, groupID, peerID string) error {
var snap *affectedpeers.Snapshot
// A membership change affects only the peer itself and the opposite side of THIS
// group's policies — not the group's other members, and not the peer's other
// groups. LinkGroups walks only this group (matched, not expanded); OutputPeerIDs
// refreshes the peer without seeding its other group memberships. For an
// intra-group policy the opposite side is the group, so its members still refresh.
change := affectedpeers.Change{OutputPeerIDs: []string{peerID}, LinkGroups: []string{groupID}}
change := affectedpeers.Change{ChangedGroupIDs: []string{groupID}}
err := am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
if err := transaction.AddPeerToGroup(ctx, accountID, peerID, groupID); err != nil {
@@ -591,11 +586,10 @@ func (am *DefaultAccountManager) GroupAddResource(ctx context.Context, accountID
// GroupDeletePeer removes peer from the group
func (am *DefaultAccountManager) GroupDeletePeer(ctx context.Context, accountID, groupID, peerID string) error {
var snap *affectedpeers.Snapshot
// Same as GroupAddPeer: the removed peer and the opposite side of THIS group's
// policies refresh, not the group's other members or the peer's other groups. The
// peer is no longer in the group's index, but LinkGroups still drives the
// opposite-side walk, and OutputPeerIDs refreshes the removed peer itself.
change := affectedpeers.Change{OutputPeerIDs: []string{peerID}, LinkGroups: []string{groupID}}
change := affectedpeers.Change{
ChangedGroupIDs: []string{groupID},
RemovedPeersByGroup: map[string][]string{groupID: {peerID}},
}
err := am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
if err := transaction.RemovePeerFromGroup(ctx, peerID, groupID); err != nil {
@@ -606,6 +600,8 @@ func (am *DefaultAccountManager) GroupDeletePeer(ctx context.Context, accountID,
return err
}
// The removed peer is carried in change.RemovedPeersByGroup and folded in
// only when the group is linked, so loading post-removal is correct.
var err error
if snap, err = affectedpeers.Load(ctx, transaction, accountID, change); err != nil {
return err