This commit is contained in:
pascal
2026-05-07 17:50:02 +02:00
parent fd99bc072d
commit 57529c7f18
4 changed files with 6 additions and 45 deletions

View File

@@ -44,7 +44,6 @@ type Controller struct {
EphemeralPeersManager ephemeral.Manager
accountUpdateLocks sync.Map
sendAccountUpdateLocks sync.Map
affectedPeerUpdateLocks sync.Map
updateAccountPeersBufferInterval atomic.Int64
// dnsDomain is used for peer resolution. This is appended to the peer's name
@@ -229,44 +228,6 @@ func (c *Controller) sendUpdateAccountPeers(ctx context.Context, accountID strin
return nil
}
func (c *Controller) bufferSendUpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) error {
log.WithContext(ctx).Tracef("buffer sending update peers for account %s from %s", accountID, util.GetCallerName())
if c.accountManagerMetrics != nil {
c.accountManagerMetrics.CountUpdateAccountPeersTriggered(string(reason.Resource), string(reason.Operation))
}
bufUpd, _ := c.sendAccountUpdateLocks.LoadOrStore(accountID, &bufferUpdate{})
b := bufUpd.(*bufferUpdate)
if !b.mu.TryLock() {
b.update.Store(true)
return nil
}
if b.next != nil {
b.next.Stop()
}
go func() {
defer b.mu.Unlock()
_ = c.sendUpdateAccountPeers(ctx, accountID)
if !b.update.Load() {
return
}
b.update.Store(false)
if b.next == nil {
b.next = time.AfterFunc(time.Duration(c.updateAccountPeersBufferInterval.Load()), func() {
_ = c.sendUpdateAccountPeers(ctx, accountID)
})
return
}
b.next.Reset(time.Duration(c.updateAccountPeersBufferInterval.Load()))
}()
return nil
}
// UpdatePeers updates all peers that belong to an account.
// Should be called when changes have to be synced to peers.
func (c *Controller) UpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) error {

View File

@@ -481,12 +481,12 @@ func TestCollectPolicyAffectedGroups_MultiplePolicies(t *testing.T) {
{Sources: []string{"g1"}, Destinations: []string{"g2"}},
},
}
new := &types.Policy{
updated := &types.Policy{
Rules: []*types.PolicyRule{
{Sources: []string{"g3"}, Destinations: []string{"g4"}},
},
}
groups, _ := collectPolicyAffectedGroupsAndPeers(context.Background(), new, old)
groups, _ := collectPolicyAffectedGroupsAndPeers(context.Background(), updated, old)
assert.ElementsMatch(t, []string{"g1", "g2", "g3", "g4"}, groups)
}
@@ -548,11 +548,11 @@ func TestCollectRouteAffectedGroups_MultipleRoutes(t *testing.T) {
Groups: []string{"g1"},
Peer: "p1",
}
new := &route.Route{
updated := &route.Route{
Groups: []string{"g2"},
PeerGroups: []string{"g3"},
}
groups, directPeers := collectRouteAffectedGroupsAndPeers(context.Background(), new, old)
groups, directPeers := collectRouteAffectedGroupsAndPeers(context.Background(), updated, old)
assert.ElementsMatch(t, []string{"g1", "g2", "g3"}, groups)
assert.ElementsMatch(t, []string{"p1"}, directPeers)
}

View File

@@ -461,7 +461,7 @@ type resourceAffectedPeersData struct {
// loadResourceAffectedPeersData loads the data needed to determine affected peers within a transaction.
func loadResourceAffectedPeersData(ctx context.Context, transaction store.Store, accountID, networkID string, resourceGroupIDs []string) (*resourceAffectedPeersData, error) {
if len(resourceGroupIDs) == 0 {
return nil, nil
return &resourceAffectedPeersData{}, nil
}
policies, err := transaction.GetAccountPolicies(ctx, store.LockingStrengthNone, accountID)

View File

@@ -321,7 +321,7 @@ func loadRouterAffectedPeersData(ctx context.Context, transaction store.Store, a
}
if len(routerPeerGroups) == 0 && len(directPeerIDs) == 0 {
return nil, nil
return &routerAffectedPeersData{}, nil
}
resources, err := transaction.GetNetworkResourcesByNetID(ctx, store.LockingStrengthNone, accountID, networkID)