affected filtering on peers update

This commit is contained in:
pascal
2026-04-28 13:48:01 +02:00
parent 5a16c812fd
commit cefb37e920
11 changed files with 138 additions and 41 deletions

View File

@@ -150,7 +150,9 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
}
if expired {
err = am.networkMapController.OnPeersUpdated(ctx, accountID, []string{peer.ID})
changedPeerIDs := []string{peer.ID}
affectedPeerIDs := am.resolveAffectedPeersForPeerChanges(ctx, am.Store, accountID, changedPeerIDs)
err = am.networkMapController.OnPeersUpdated(ctx, accountID, changedPeerIDs, affectedPeerIDs)
if err != nil {
return fmt.Errorf("notify network map controller of peer update: %w", err)
}
@@ -334,7 +336,9 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
}
}
err = am.networkMapController.OnPeersUpdated(ctx, accountID, []string{peer.ID})
changedPeerIDs := []string{peer.ID}
affectedPeerIDs := am.resolveAffectedPeersForPeerChanges(ctx, am.Store, accountID, changedPeerIDs)
err = am.networkMapController.OnPeersUpdated(ctx, accountID, changedPeerIDs, affectedPeerIDs)
if err != nil {
return nil, fmt.Errorf("notify network map controller of peer update: %w", err)
}
@@ -492,6 +496,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
var peer *nbpeer.Peer
var settings *types.Settings
var eventsToStore []func()
var affectedPeerIDs []string
serviceID, err := am.serviceManager.GetServiceIDByTargetID(ctx, accountID, peerID)
if err != nil {
@@ -516,6 +521,8 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
return err
}
affectedPeerIDs = am.resolveAffectedPeersForPeerChanges(ctx, transaction, accountID, []string{peerID})
eventsToStore, err = deletePeers(ctx, am, transaction, accountID, userID, []*nbpeer.Peer{peer}, settings)
if err != nil {
return fmt.Errorf("failed to delete peer: %w", err)
@@ -539,7 +546,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
log.WithContext(ctx).Errorf("failed to delete peer %s from integrated validator: %v", peerID, err)
}
if err = am.networkMapController.OnPeersDeleted(ctx, accountID, []string{peerID}); err != nil {
if err = am.networkMapController.OnPeersDeleted(ctx, accountID, []string{peerID}, affectedPeerIDs); err != nil {
log.WithContext(ctx).Errorf("failed to delete peer %s from network map: %v", peerID, err)
}
@@ -863,7 +870,9 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, accountID, setupKe
am.StoreEvent(ctx, opEvent.InitiatorID, opEvent.TargetID, opEvent.AccountID, opEvent.Activity, opEvent.Meta)
}
if err := am.networkMapController.OnPeersAdded(ctx, accountID, []string{newPeer.ID}); err != nil {
changedPeerIDs := []string{newPeer.ID}
affectedPeerIDs := am.resolveAffectedPeersForPeerChanges(ctx, am.Store, accountID, changedPeerIDs)
if err := am.networkMapController.OnPeersAdded(ctx, accountID, changedPeerIDs, affectedPeerIDs); err != nil {
log.WithContext(ctx).Errorf("failed to update network map cache for peer %s: %v", newPeer.ID, err)
}
@@ -946,7 +955,9 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
}
if isStatusChanged || sync.UpdateAccountPeers || (updated && (len(postureChecks) > 0 || versionChanged)) {
err = am.networkMapController.OnPeersUpdated(ctx, accountID, []string{peer.ID})
changedPeerIDs := []string{peer.ID}
affectedPeerIDs := am.resolveAffectedPeersForPeerChanges(ctx, am.Store, accountID, changedPeerIDs)
err = am.networkMapController.OnPeersUpdated(ctx, accountID, changedPeerIDs, affectedPeerIDs)
if err != nil {
return nil, nil, nil, 0, fmt.Errorf("notify network map controller of peer update: %w", err)
}
@@ -1073,7 +1084,9 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
}
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
err = am.networkMapController.OnPeersUpdated(ctx, accountID, []string{peer.ID})
changedPeerIDs := []string{peer.ID}
affectedPeerIDs := am.resolveAffectedPeersForPeerChanges(ctx, am.Store, accountID, changedPeerIDs)
err = am.networkMapController.OnPeersUpdated(ctx, accountID, changedPeerIDs, affectedPeerIDs)
if err != nil {
return nil, nil, nil, fmt.Errorf("notify network map controller of peer update: %w", err)
}
@@ -1297,6 +1310,7 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
// UpdateAffectedPeers updates only the specified peers that belong to an account.
// Should be called when a change is known to affect only a subset of peers.
func (am *DefaultAccountManager) UpdateAffectedPeers(ctx context.Context, accountID string, peerIDs []string) {
log.WithContext(ctx).Tracef("UpdateAffectedPeers: %d peers for account %s", len(peerIDs), accountID)
_ = am.networkMapController.UpdateAffectedPeers(ctx, accountID, peerIDs)
}
@@ -1310,6 +1324,7 @@ func (am *DefaultAccountManager) resolvePeerIDs(ctx context.Context, s store.Sto
}
if len(directPeerIDs) == 0 {
log.WithContext(ctx).Tracef("resolvePeerIDs: groups=%v -> %d peers", groupIDs, len(peerIDs))
return peerIDs
}
@@ -1323,6 +1338,8 @@ func (am *DefaultAccountManager) resolvePeerIDs(ctx context.Context, s store.Sto
seen[id] = struct{}{}
}
}
log.WithContext(ctx).Tracef("resolvePeerIDs: groups=%v + directPeers=%v -> %d peers", groupIDs, directPeerIDs, len(peerIDs))
return peerIDs
}
@@ -1332,6 +1349,25 @@ func (am *DefaultAccountManager) BufferUpdateAffectedPeers(ctx context.Context,
_ = am.networkMapController.BufferUpdateAffectedPeers(ctx, accountID, peerIDs)
}
// resolveAffectedPeersForPeerChanges resolves changed peer IDs into the full set of
// affected peers: finds groups containing the changed peers, walks all entity linkages,
// and resolves back to peer IDs.
func (am *DefaultAccountManager) resolveAffectedPeersForPeerChanges(ctx context.Context, s store.Store, accountID string, changedPeerIDs []string) []string {
groupIDs, err := s.GetGroupIDsByPeerIDs(ctx, accountID, changedPeerIDs)
if err != nil {
log.WithContext(ctx).Errorf("failed to get group IDs for changed peers: %v", err)
return nil
}
log.WithContext(ctx).Tracef("resolveAffectedPeersForPeerChanges: changedPeers=%v -> groups=%v", changedPeerIDs, groupIDs)
allGroupIDs, directPeerIDs := collectGroupChangeAffectedGroups(ctx, s, accountID, groupIDs)
result := am.resolvePeerIDs(ctx, s, accountID, allGroupIDs, directPeerIDs)
log.WithContext(ctx).Tracef("resolveAffectedPeersForPeerChanges: changedPeers=%v -> %d affected peers", changedPeerIDs, len(result))
return result
}
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
_ = am.networkMapController.BufferUpdateAccountPeers(ctx, accountID)
}