mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-03 15:46:38 +00:00
add to networks modules
This commit is contained in:
@@ -114,6 +114,7 @@ func (m *managerImpl) CreateResource(ctx context.Context, userID string, resourc
|
||||
}
|
||||
|
||||
var eventsToStore []func()
|
||||
var affectedData *resourceAffectedPeersData
|
||||
err = m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||
_, err = transaction.GetNetworkResourceByName(ctx, store.LockingStrengthNone, resource.AccountID, resource.Name)
|
||||
if err == nil {
|
||||
@@ -152,6 +153,11 @@ func (m *managerImpl) CreateResource(ctx context.Context, userID string, resourc
|
||||
return fmt.Errorf("failed to increment network serial: %w", err)
|
||||
}
|
||||
|
||||
affectedData, err = loadResourceAffectedPeersData(ctx, transaction, resource.AccountID, resource.NetworkID, resource.GroupIDs)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to load affected peers data: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
@@ -162,7 +168,9 @@ func (m *managerImpl) CreateResource(ctx context.Context, userID string, resourc
|
||||
event()
|
||||
}
|
||||
|
||||
go m.accountManager.UpdateAccountPeers(ctx, resource.AccountID)
|
||||
if affectedPeerIDs := m.resolveResourceAffectedPeers(ctx, resource.AccountID, affectedData); len(affectedPeerIDs) > 0 {
|
||||
go m.accountManager.UpdateAffectedPeers(ctx, resource.AccountID, affectedPeerIDs)
|
||||
}
|
||||
|
||||
return resource, nil
|
||||
}
|
||||
@@ -207,6 +215,7 @@ func (m *managerImpl) UpdateResource(ctx context.Context, userID string, resourc
|
||||
resource.Prefix = prefix
|
||||
|
||||
var eventsToStore []func()
|
||||
var affectedData *resourceAffectedPeersData
|
||||
err = m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||
network, err := transaction.GetNetworkByID(ctx, store.LockingStrengthUpdate, resource.AccountID, resource.NetworkID)
|
||||
if err != nil {
|
||||
@@ -232,6 +241,15 @@ func (m *managerImpl) UpdateResource(ctx context.Context, userID string, resourc
|
||||
return fmt.Errorf("failed to get network resource: %w", err)
|
||||
}
|
||||
|
||||
oldGroups, err := m.groupsManager.GetResourceGroupsInTransaction(ctx, transaction, store.LockingStrengthNone, oldResource.AccountID, oldResource.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get old resource groups: %w", err)
|
||||
}
|
||||
var oldGroupIDs []string
|
||||
for _, g := range oldGroups {
|
||||
oldGroupIDs = append(oldGroupIDs, g.ID)
|
||||
}
|
||||
|
||||
err = transaction.SaveNetworkResource(ctx, resource)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to save network resource: %w", err)
|
||||
@@ -247,6 +265,11 @@ func (m *managerImpl) UpdateResource(ctx context.Context, userID string, resourc
|
||||
m.accountManager.StoreEvent(ctx, userID, resource.ID, resource.AccountID, activity.NetworkResourceUpdated, resource.EventMeta(network))
|
||||
})
|
||||
|
||||
affectedData, err = loadResourceAffectedPeersData(ctx, transaction, resource.AccountID, resource.NetworkID, append(resource.GroupIDs, oldGroupIDs...))
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to load affected peers data: %v", err)
|
||||
}
|
||||
|
||||
err = transaction.IncrementNetworkSerial(ctx, resource.AccountID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to increment network serial: %w", err)
|
||||
@@ -270,7 +293,9 @@ func (m *managerImpl) UpdateResource(ctx context.Context, userID string, resourc
|
||||
}
|
||||
}()
|
||||
|
||||
go m.accountManager.UpdateAccountPeers(ctx, resource.AccountID)
|
||||
if affectedPeerIDs := m.resolveResourceAffectedPeers(ctx, resource.AccountID, affectedData); len(affectedPeerIDs) > 0 {
|
||||
go m.accountManager.UpdateAffectedPeers(ctx, resource.AccountID, affectedPeerIDs)
|
||||
}
|
||||
|
||||
return resource, nil
|
||||
}
|
||||
@@ -331,7 +356,22 @@ func (m *managerImpl) DeleteResource(ctx context.Context, accountID, userID, net
|
||||
}
|
||||
|
||||
var events []func()
|
||||
var affectedData *resourceAffectedPeersData
|
||||
err = m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||
groups, err := m.groupsManager.GetResourceGroupsInTransaction(ctx, transaction, store.LockingStrengthNone, accountID, resourceID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get resource groups: %w", err)
|
||||
}
|
||||
var resourceGroupIDs []string
|
||||
for _, g := range groups {
|
||||
resourceGroupIDs = append(resourceGroupIDs, g.ID)
|
||||
}
|
||||
|
||||
affectedData, err = loadResourceAffectedPeersData(ctx, transaction, accountID, networkID, resourceGroupIDs)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to load affected peers data: %v", err)
|
||||
}
|
||||
|
||||
events, err = m.DeleteResourceInTransaction(ctx, transaction, accountID, userID, networkID, resourceID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete resource: %w", err)
|
||||
@@ -352,7 +392,9 @@ func (m *managerImpl) DeleteResource(ctx context.Context, accountID, userID, net
|
||||
event()
|
||||
}
|
||||
|
||||
go m.accountManager.UpdateAccountPeers(ctx, accountID)
|
||||
if affectedPeerIDs := m.resolveResourceAffectedPeers(ctx, accountID, affectedData); len(affectedPeerIDs) > 0 {
|
||||
go m.accountManager.UpdateAffectedPeers(ctx, accountID, affectedPeerIDs)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -399,6 +441,127 @@ func (m *managerImpl) DeleteResourceInTransaction(ctx context.Context, transacti
|
||||
return eventsToStore, nil
|
||||
}
|
||||
|
||||
// resourceAffectedPeersData holds data loaded inside a transaction for affected peer resolution.
|
||||
type resourceAffectedPeersData struct {
|
||||
resourceGroupIDs []string
|
||||
policies []*nbtypes.Policy
|
||||
routerPeerGroups []string
|
||||
routerDirectPeers []string
|
||||
}
|
||||
|
||||
// loadResourceAffectedPeersData loads the data needed to determine affected peers within a transaction.
|
||||
func loadResourceAffectedPeersData(ctx context.Context, transaction store.Store, accountID, networkID string, resourceGroupIDs []string) (*resourceAffectedPeersData, error) {
|
||||
if len(resourceGroupIDs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
policies, err := transaction.GetAccountPolicies(ctx, store.LockingStrengthNone, accountID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get policies: %w", err)
|
||||
}
|
||||
|
||||
routers, err := transaction.GetNetworkRoutersByNetID(ctx, store.LockingStrengthNone, accountID, networkID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get routers: %w", err)
|
||||
}
|
||||
|
||||
var routerPeerGroups []string
|
||||
var routerDirectPeers []string
|
||||
for _, router := range routers {
|
||||
if !router.Enabled {
|
||||
continue
|
||||
}
|
||||
routerPeerGroups = append(routerPeerGroups, router.PeerGroups...)
|
||||
if router.Peer != "" {
|
||||
routerDirectPeers = append(routerDirectPeers, router.Peer)
|
||||
}
|
||||
}
|
||||
|
||||
return &resourceAffectedPeersData{
|
||||
resourceGroupIDs: resourceGroupIDs,
|
||||
policies: policies,
|
||||
routerPeerGroups: routerPeerGroups,
|
||||
routerDirectPeers: routerDirectPeers,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// resolveResourceAffectedPeers computes affected peer IDs from preloaded data outside the transaction.
|
||||
func (m *managerImpl) resolveResourceAffectedPeers(ctx context.Context, accountID string, data *resourceAffectedPeersData) []string {
|
||||
if data == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
groupSet := make(map[string]struct{})
|
||||
var directPeerIDs []string
|
||||
|
||||
destSet := make(map[string]struct{}, len(data.resourceGroupIDs))
|
||||
for _, gID := range data.resourceGroupIDs {
|
||||
destSet[gID] = struct{}{}
|
||||
}
|
||||
|
||||
for _, policy := range data.policies {
|
||||
if policy == nil || !policy.Enabled {
|
||||
continue
|
||||
}
|
||||
for _, rule := range policy.Rules {
|
||||
if rule == nil || !rule.Enabled {
|
||||
continue
|
||||
}
|
||||
referencesResource := false
|
||||
for _, gID := range rule.Destinations {
|
||||
if _, ok := destSet[gID]; ok {
|
||||
referencesResource = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !referencesResource {
|
||||
continue
|
||||
}
|
||||
for _, gID := range rule.Sources {
|
||||
groupSet[gID] = struct{}{}
|
||||
}
|
||||
if rule.SourceResource.Type == nbtypes.ResourceTypePeer && rule.SourceResource.ID != "" {
|
||||
directPeerIDs = append(directPeerIDs, rule.SourceResource.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, gID := range data.routerPeerGroups {
|
||||
groupSet[gID] = struct{}{}
|
||||
}
|
||||
directPeerIDs = append(directPeerIDs, data.routerDirectPeers...)
|
||||
|
||||
if len(groupSet) == 0 && len(directPeerIDs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
groupIDs := make([]string, 0, len(groupSet))
|
||||
for gID := range groupSet {
|
||||
groupIDs = append(groupIDs, gID)
|
||||
}
|
||||
|
||||
peerIDs, err := m.store.GetPeerIDsByGroups(ctx, accountID, groupIDs)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to resolve peer IDs: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(directPeerIDs) > 0 {
|
||||
seen := make(map[string]struct{}, len(peerIDs))
|
||||
for _, id := range peerIDs {
|
||||
seen[id] = struct{}{}
|
||||
}
|
||||
for _, id := range directPeerIDs {
|
||||
if _, exists := seen[id]; !exists {
|
||||
peerIDs = append(peerIDs, id)
|
||||
seen[id] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return peerIDs, nil
|
||||
}
|
||||
|
||||
func NewManagerMock() Manager {
|
||||
return &mockManager{}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user