[management] add monitoring for nmap update source (#6036)

This commit is contained in:
Pascal Fischer
2026-04-30 14:52:54 +02:00
committed by GitHub
parent 3fc5a8d4a1
commit f29f5a0978
28 changed files with 165 additions and 86 deletions

View File

@@ -257,7 +257,10 @@ func (c *Controller) bufferSendUpdateAccountPeers(ctx context.Context, accountID
// UpdatePeers updates all peers that belong to an account.
// Should be called when changes have to be synced to peers.
func (c *Controller) UpdateAccountPeers(ctx context.Context, accountID string) error {
func (c *Controller) UpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) error {
if c.accountManagerMetrics != nil {
c.accountManagerMetrics.CountUpdateAccountPeersTriggered(string(reason.Resource), string(reason.Operation))
}
return c.sendUpdateAccountPeers(ctx, accountID)
}
@@ -331,9 +334,13 @@ func (c *Controller) UpdateAccountPeer(ctx context.Context, accountId string, pe
return nil
}
func (c *Controller) BufferUpdateAccountPeers(ctx context.Context, accountID string) error {
func (c *Controller) BufferUpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) error {
log.WithContext(ctx).Tracef("buffer updating peers for account %s from %s", accountID, util.GetCallerName())
if c.accountManagerMetrics != nil {
c.accountManagerMetrics.CountUpdateAccountPeersTriggered(string(reason.Resource), string(reason.Operation))
}
bufUpd, _ := c.accountUpdateLocks.LoadOrStore(accountID, &bufferUpdate{})
b := bufUpd.(*bufferUpdate)
@@ -348,14 +355,14 @@ func (c *Controller) BufferUpdateAccountPeers(ctx context.Context, accountID str
go func() {
defer b.mu.Unlock()
_ = c.UpdateAccountPeers(ctx, accountID)
_ = c.sendUpdateAccountPeers(ctx, accountID)
if !b.update.Load() {
return
}
b.update.Store(false)
if b.next == nil {
b.next = time.AfterFunc(time.Duration(c.updateAccountPeersBufferInterval.Load()), func() {
_ = c.UpdateAccountPeers(ctx, accountID)
_ = c.sendUpdateAccountPeers(ctx, accountID)
})
return
}

View File

@@ -18,9 +18,9 @@ const (
)
type Controller interface {
UpdateAccountPeers(ctx context.Context, accountID string) error
UpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) error
UpdateAccountPeer(ctx context.Context, accountId string, peerId string) error
BufferUpdateAccountPeers(ctx context.Context, accountID string) error
BufferUpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) error
GetValidatedPeerWithMap(ctx context.Context, isRequiresApproval bool, accountID string, p *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error)
GetDNSDomain(settings *types.Settings) string
StartWarmup(context.Context)

View File

@@ -44,17 +44,17 @@ func (m *MockController) EXPECT() *MockControllerMockRecorder {
}
// BufferUpdateAccountPeers mocks base method.
func (m *MockController) BufferUpdateAccountPeers(ctx context.Context, accountID string) error {
func (m *MockController) BufferUpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "BufferUpdateAccountPeers", ctx, accountID)
ret := m.ctrl.Call(m, "BufferUpdateAccountPeers", ctx, accountID, reason)
ret0, _ := ret[0].(error)
return ret0
}
// BufferUpdateAccountPeers indicates an expected call of BufferUpdateAccountPeers.
func (mr *MockControllerMockRecorder) BufferUpdateAccountPeers(ctx, accountID any) *gomock.Call {
func (mr *MockControllerMockRecorder) BufferUpdateAccountPeers(ctx, accountID, reason any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BufferUpdateAccountPeers", reflect.TypeOf((*MockController)(nil).BufferUpdateAccountPeers), ctx, accountID)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BufferUpdateAccountPeers", reflect.TypeOf((*MockController)(nil).BufferUpdateAccountPeers), ctx, accountID, reason)
}
// CountStreams mocks base method.
@@ -238,15 +238,15 @@ func (mr *MockControllerMockRecorder) UpdateAccountPeer(ctx, accountId, peerId a
}
// UpdateAccountPeers mocks base method.
func (m *MockController) UpdateAccountPeers(ctx context.Context, accountID string) error {
func (m *MockController) UpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateAccountPeers", ctx, accountID)
ret := m.ctrl.Call(m, "UpdateAccountPeers", ctx, accountID, reason)
ret0, _ := ret[0].(error)
return ret0
}
// UpdateAccountPeers indicates an expected call of UpdateAccountPeers.
func (mr *MockControllerMockRecorder) UpdateAccountPeers(ctx, accountID any) *gomock.Call {
func (mr *MockControllerMockRecorder) UpdateAccountPeers(ctx, accountID, reason any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAccountPeers", reflect.TypeOf((*MockController)(nil).UpdateAccountPeers), ctx, accountID)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAccountPeers", reflect.TypeOf((*MockController)(nil).UpdateAccountPeers), ctx, accountID, reason)
}

View File

@@ -62,7 +62,7 @@ func (a *MockAccountManager) GetDeletePeerCalls() int {
return a.deletePeerCalls
}
func (a *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
func (a *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) {
a.mu.Lock()
defer a.mu.Unlock()
if a.bufferUpdateCalls == nil {
@@ -248,7 +248,7 @@ func TestCleanupSchedulingBehaviorIsBatched(t *testing.T) {
return err
}
}
mockAM.BufferUpdateAccountPeers(ctx, accountID)
mockAM.BufferUpdateAccountPeers(ctx, accountID, types.UpdateReason{})
return nil
}).
Times(1)

View File

@@ -20,6 +20,7 @@ import (
"github.com/netbirdio/netbird/management/server/permissions/modules"
"github.com/netbirdio/netbird/management/server/permissions/operations"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/status"
)
@@ -178,7 +179,7 @@ func (m *managerImpl) DeletePeers(ctx context.Context, accountID string, peerIDs
}
}
m.accountManager.UpdateAccountPeers(ctx, accountID)
m.accountManager.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourcePeer, Operation: types.UpdateOperationDelete})
return nil
}

View File

@@ -85,7 +85,7 @@ func setupL4Test(t *testing.T, customPortsSupported *bool) (*Manager, store.Stor
accountMgr := &mock_server.MockAccountManager{
StoreEventFunc: func(_ context.Context, _, _, _ string, _ activity.ActivityDescriber, _ map[string]any) {},
UpdateAccountPeersFunc: func(_ context.Context, _ string) {},
UpdateAccountPeersFunc: func(_ context.Context, _ string, _ types.UpdateReason) {},
GetGroupByNameFunc: func(ctx context.Context, groupName, accountID, userID string) (*types.Group, error) {
return testStore.GetGroupByName(ctx, store.LockingStrengthNone, accountID, groupName)
},

View File

@@ -25,6 +25,7 @@ import (
"github.com/netbirdio/netbird/management/server/permissions/modules"
"github.com/netbirdio/netbird/management/server/permissions/operations"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/status"
)
@@ -231,7 +232,7 @@ func (m *Manager) CreateService(ctx context.Context, accountID, userID string, s
m.proxyController.SendServiceUpdateToCluster(ctx, accountID, s.ToProtoMapping(service.Create, "", m.proxyController.GetOIDCValidationConfig()), s.ProxyCluster)
m.accountManager.UpdateAccountPeers(ctx, accountID)
m.accountManager.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceService, Operation: types.UpdateOperationCreate})
return s, nil
}
@@ -515,7 +516,7 @@ func (m *Manager) UpdateService(ctx context.Context, accountID, userID string, s
}
m.sendServiceUpdateNotifications(ctx, accountID, service, updateInfo)
m.accountManager.UpdateAccountPeers(ctx, accountID)
m.accountManager.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceService, Operation: types.UpdateOperationUpdate})
return service, nil
}
@@ -819,7 +820,7 @@ func (m *Manager) DeleteService(ctx context.Context, accountID, userID, serviceI
m.proxyController.SendServiceUpdateToCluster(ctx, accountID, s.ToProtoMapping(service.Delete, "", m.proxyController.GetOIDCValidationConfig()), s.ProxyCluster)
m.accountManager.UpdateAccountPeers(ctx, accountID)
m.accountManager.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceService, Operation: types.UpdateOperationDelete})
return nil
}
@@ -860,7 +861,7 @@ func (m *Manager) DeleteAllServices(ctx context.Context, accountID, userID strin
m.proxyController.SendServiceUpdateToCluster(ctx, accountID, svc.ToProtoMapping(service.Delete, "", oidcCfg), svc.ProxyCluster)
}
m.accountManager.UpdateAccountPeers(ctx, accountID)
m.accountManager.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceService, Operation: types.UpdateOperationDelete})
return nil
}
@@ -916,7 +917,7 @@ func (m *Manager) ReloadService(ctx context.Context, accountID, serviceID string
m.proxyController.SendServiceUpdateToCluster(ctx, accountID, s.ToProtoMapping(service.Update, "", m.proxyController.GetOIDCValidationConfig()), s.ProxyCluster)
m.accountManager.UpdateAccountPeers(ctx, accountID)
m.accountManager.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceService, Operation: types.UpdateOperationUpdate})
return nil
}
@@ -1098,7 +1099,7 @@ func (m *Manager) CreateServiceFromPeer(ctx context.Context, accountID, peerID s
}
m.proxyController.SendServiceUpdateToCluster(ctx, accountID, svc.ToProtoMapping(service.Create, "", m.proxyController.GetOIDCValidationConfig()), svc.ProxyCluster)
m.accountManager.UpdateAccountPeers(ctx, accountID)
m.accountManager.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceService, Operation: types.UpdateOperationCreate})
serviceURL := "https://" + svc.Domain
if service.IsL4Protocol(svc.Mode) {
@@ -1210,7 +1211,7 @@ func (m *Manager) deletePeerService(ctx context.Context, accountID, peerID, serv
m.proxyController.SendServiceUpdateToCluster(ctx, accountID, svc.ToProtoMapping(service.Delete, "", m.proxyController.GetOIDCValidationConfig()), svc.ProxyCluster)
m.accountManager.UpdateAccountPeers(ctx, accountID)
m.accountManager.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceService, Operation: types.UpdateOperationDelete})
return nil
}
@@ -1261,7 +1262,7 @@ func (m *Manager) deleteExpiredPeerService(ctx context.Context, accountID, peerI
meta := addPeerInfoToEventMeta(svc.EventMeta(), peer)
m.accountManager.StoreEvent(ctx, peerID, serviceID, accountID, activity.PeerServiceExposeExpired, meta)
m.proxyController.SendServiceUpdateToCluster(ctx, accountID, svc.ToProtoMapping(service.Delete, "", m.proxyController.GetOIDCValidationConfig()), svc.ProxyCluster)
m.accountManager.UpdateAccountPeers(ctx, accountID)
m.accountManager.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceService, Operation: types.UpdateOperationDelete})
return nil
}

View File

@@ -447,7 +447,7 @@ func TestDeletePeerService_SourcePeerValidation(t *testing.T) {
StoreEventFunc: func(_ context.Context, _, _, _ string, activityID activity.ActivityDescriber, _ map[string]any) {
storedActivity = activityID.(activity.Activity)
},
UpdateAccountPeersFunc: func(_ context.Context, _ string) {},
UpdateAccountPeersFunc: func(_ context.Context, _ string, _ types.UpdateReason) {},
}
mockStore.EXPECT().
@@ -549,7 +549,7 @@ func TestDeletePeerService_SourcePeerValidation(t *testing.T) {
StoreEventFunc: func(_ context.Context, _, _, _ string, activityID activity.ActivityDescriber, _ map[string]any) {
storedActivity = activityID.(activity.Activity)
},
UpdateAccountPeersFunc: func(_ context.Context, _ string) {},
UpdateAccountPeersFunc: func(_ context.Context, _ string, _ types.UpdateReason) {},
}
mockStore.EXPECT().
@@ -593,7 +593,7 @@ func TestDeletePeerService_SourcePeerValidation(t *testing.T) {
StoreEventFunc: func(_ context.Context, _, _, _ string, _ activity.ActivityDescriber, meta map[string]any) {
storedMeta = meta
},
UpdateAccountPeersFunc: func(_ context.Context, _ string) {},
UpdateAccountPeersFunc: func(_ context.Context, _ string, _ types.UpdateReason) {},
}
mockStore.EXPECT().
@@ -704,7 +704,7 @@ func setupIntegrationTest(t *testing.T) (*Manager, store.Store) {
accountMgr := &mock_server.MockAccountManager{
StoreEventFunc: func(_ context.Context, _, _, _ string, _ activity.ActivityDescriber, _ map[string]any) {},
UpdateAccountPeersFunc: func(_ context.Context, _ string) {},
UpdateAccountPeersFunc: func(_ context.Context, _ string, _ types.UpdateReason) {},
GetGroupByNameFunc: func(ctx context.Context, groupName, accountID, userID string) (*types.Group, error) {
return testStore.GetGroupByName(ctx, store.LockingStrengthNone, accountID, groupName)
},
@@ -1173,7 +1173,7 @@ func TestDeleteService_DeletesTargets(t *testing.T) {
mockAcct.EXPECT().
StoreEvent(ctx, userID, service.ID, accountID, activity.ServiceDeleted, gomock.Any())
mockAcct.EXPECT().
UpdateAccountPeers(ctx, accountID)
UpdateAccountPeers(ctx, accountID, gomock.Any())
err = mgr.DeleteService(ctx, accountID, userID, service.ID)
require.NoError(t, err)

View File

@@ -11,6 +11,7 @@ import (
"github.com/netbirdio/netbird/management/server/permissions/modules"
"github.com/netbirdio/netbird/management/server/permissions/operations"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/status"
)
@@ -144,7 +145,7 @@ func (m *managerImpl) UpdateZone(ctx context.Context, accountID, userID string,
m.accountManager.StoreEvent(ctx, userID, zone.ID, accountID, activity.DNSZoneUpdated, zone.EventMeta())
go m.accountManager.UpdateAccountPeers(ctx, accountID)
go m.accountManager.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceZone, Operation: types.UpdateOperationUpdate})
return zone, nil
}
@@ -206,7 +207,7 @@ func (m *managerImpl) DeleteZone(ctx context.Context, accountID, userID, zoneID
event()
}
go m.accountManager.UpdateAccountPeers(ctx, accountID)
go m.accountManager.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceZone, Operation: types.UpdateOperationDelete})
return nil
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/netbirdio/netbird/management/server/permissions/modules"
"github.com/netbirdio/netbird/management/server/permissions/operations"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/status"
)
@@ -95,7 +96,7 @@ func (m *managerImpl) CreateRecord(ctx context.Context, accountID, userID, zoneI
meta := record.EventMeta(zone.ID, zone.Name)
m.accountManager.StoreEvent(ctx, userID, record.ID, accountID, activity.DNSRecordCreated, meta)
go m.accountManager.UpdateAccountPeers(ctx, accountID)
go m.accountManager.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceZoneRecord, Operation: types.UpdateOperationCreate})
return record, nil
}
@@ -154,7 +155,7 @@ func (m *managerImpl) UpdateRecord(ctx context.Context, accountID, userID, zoneI
meta := record.EventMeta(zone.ID, zone.Name)
m.accountManager.StoreEvent(ctx, userID, record.ID, accountID, activity.DNSRecordUpdated, meta)
go m.accountManager.UpdateAccountPeers(ctx, accountID)
go m.accountManager.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceZoneRecord, Operation: types.UpdateOperationUpdate})
return record, nil
}
@@ -201,7 +202,7 @@ func (m *managerImpl) DeleteRecord(ctx context.Context, accountID, userID, zoneI
meta := record.EventMeta(zone.ID, zone.Name)
m.accountManager.StoreEvent(ctx, userID, recordID, accountID, activity.DNSRecordDeleted, meta)
go m.accountManager.UpdateAccountPeers(ctx, accountID)
go m.accountManager.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceZoneRecord, Operation: types.UpdateOperationDelete})
return nil
}

View File

@@ -400,7 +400,7 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco
}
if updateAccountPeers || extraSettingsChanged || groupChangesAffectPeers {
go am.UpdateAccountPeers(ctx, accountID)
go am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceAccountSettings, Operation: types.UpdateOperationUpdate})
}
return newSettings, nil
@@ -1581,7 +1581,7 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth
if removedGroupAffectsPeers || newGroupsAffectsPeers {
log.WithContext(ctx).Tracef("user %s: JWT group membership changed, updating account peers", userAuth.UserId)
am.BufferUpdateAccountPeers(ctx, userAuth.AccountId)
am.BufferUpdateAccountPeers(ctx, userAuth.AccountId, types.UpdateReason{Resource: types.UpdateResourceUser, Operation: types.UpdateOperationUpdate})
}
return nil

View File

@@ -124,8 +124,8 @@ type Manager interface {
GetAccountIDForPeerKey(ctx context.Context, peerKey string) (string, error)
GetAccountSettings(ctx context.Context, accountID string, userID string) (*types.Settings, error)
DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error
UpdateAccountPeers(ctx context.Context, accountID string)
BufferUpdateAccountPeers(ctx context.Context, accountID string)
UpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason)
BufferUpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason)
BuildUserInfosForAccount(ctx context.Context, accountID, initiatorUserID string, accountUsers []*types.User) (map[string]*types.UserInfo, error)
SyncUserJWTGroups(ctx context.Context, userAuth auth.UserAuth) error
GetStore() store.Store

View File

@@ -111,15 +111,15 @@ func (mr *MockManagerMockRecorder) ApproveUser(ctx, accountID, initiatorUserID,
}
// BufferUpdateAccountPeers mocks base method.
func (m *MockManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
func (m *MockManager) BufferUpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "BufferUpdateAccountPeers", ctx, accountID)
m.ctrl.Call(m, "BufferUpdateAccountPeers", ctx, accountID, reason)
}
// BufferUpdateAccountPeers indicates an expected call of BufferUpdateAccountPeers.
func (mr *MockManagerMockRecorder) BufferUpdateAccountPeers(ctx, accountID interface{}) *gomock.Call {
func (mr *MockManagerMockRecorder) BufferUpdateAccountPeers(ctx, accountID, reason interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BufferUpdateAccountPeers", reflect.TypeOf((*MockManager)(nil).BufferUpdateAccountPeers), ctx, accountID)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BufferUpdateAccountPeers", reflect.TypeOf((*MockManager)(nil).BufferUpdateAccountPeers), ctx, accountID, reason)
}
// BuildUserInfosForAccount mocks base method.
@@ -1597,15 +1597,15 @@ func (mr *MockManagerMockRecorder) UpdateAccountOnboarding(ctx, accountID, userI
}
// UpdateAccountPeers mocks base method.
func (m *MockManager) UpdateAccountPeers(ctx context.Context, accountID string) {
func (m *MockManager) UpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "UpdateAccountPeers", ctx, accountID)
m.ctrl.Call(m, "UpdateAccountPeers", ctx, accountID, reason)
}
// UpdateAccountPeers indicates an expected call of UpdateAccountPeers.
func (mr *MockManagerMockRecorder) UpdateAccountPeers(ctx, accountID interface{}) *gomock.Call {
func (mr *MockManagerMockRecorder) UpdateAccountPeers(ctx, accountID, reason interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAccountPeers", reflect.TypeOf((*MockManager)(nil).UpdateAccountPeers), ctx, accountID)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateAccountPeers", reflect.TypeOf((*MockManager)(nil).UpdateAccountPeers), ctx, accountID, reason)
}
// UpdateAccountSettings mocks base method.

View File

@@ -86,7 +86,7 @@ func (am *DefaultAccountManager) SaveDNSSettings(ctx context.Context, accountID
}
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceDNSSettings, Operation: types.UpdateOperationUpdate})
}
return nil

View File

@@ -117,7 +117,7 @@ func (am *DefaultAccountManager) CreateGroup(ctx context.Context, accountID, use
}
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceGroup, Operation: types.UpdateOperationCreate})
}
return nil
@@ -185,7 +185,7 @@ func (am *DefaultAccountManager) UpdateGroup(ctx context.Context, accountID, use
}
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceGroup, Operation: types.UpdateOperationUpdate})
}
return nil
@@ -253,7 +253,7 @@ func (am *DefaultAccountManager) CreateGroups(ctx context.Context, accountID, us
}
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceGroup, Operation: types.UpdateOperationCreate})
}
return globalErr
@@ -321,7 +321,7 @@ func (am *DefaultAccountManager) UpdateGroups(ctx context.Context, accountID, us
}
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceGroup, Operation: types.UpdateOperationUpdate})
}
return globalErr
@@ -493,7 +493,7 @@ func (am *DefaultAccountManager) GroupAddPeer(ctx context.Context, accountID, gr
}
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceGroup, Operation: types.UpdateOperationUpdate})
}
return nil
@@ -531,7 +531,7 @@ func (am *DefaultAccountManager) GroupAddResource(ctx context.Context, accountID
}
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceGroup, Operation: types.UpdateOperationUpdate})
}
return nil
@@ -559,7 +559,7 @@ func (am *DefaultAccountManager) GroupDeletePeer(ctx context.Context, accountID,
}
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceGroup, Operation: types.UpdateOperationUpdate})
}
return nil
@@ -597,7 +597,7 @@ func (am *DefaultAccountManager) GroupDeleteResource(ctx context.Context, accoun
}
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceGroup, Operation: types.UpdateOperationUpdate})
}
return nil

View File

@@ -128,8 +128,8 @@ type MockAccountManager struct {
GetOrCreateAccountByPrivateDomainFunc func(ctx context.Context, initiatorId, domain string) (*types.Account, bool, error)
AllowSyncFunc func(string, uint64) bool
UpdateAccountPeersFunc func(ctx context.Context, accountID string)
BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string)
UpdateAccountPeersFunc func(ctx context.Context, accountID string, reason types.UpdateReason)
BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string, reason types.UpdateReason)
RecalculateNetworkMapCacheFunc func(ctx context.Context, accountId string) error
GetIdentityProviderFunc func(ctx context.Context, accountID, idpID, userID string) (*types.IdentityProvider, error)
@@ -200,15 +200,15 @@ func (am *MockAccountManager) UpdateGroups(ctx context.Context, accountID, userI
return status.Errorf(codes.Unimplemented, "method UpdateGroups is not implemented")
}
func (am *MockAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) {
func (am *MockAccountManager) UpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) {
if am.UpdateAccountPeersFunc != nil {
am.UpdateAccountPeersFunc(ctx, accountID)
am.UpdateAccountPeersFunc(ctx, accountID, reason)
}
}
func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) {
if am.BufferUpdateAccountPeersFunc != nil {
am.BufferUpdateAccountPeersFunc(ctx, accountID)
am.BufferUpdateAccountPeersFunc(ctx, accountID, reason)
}
}

View File

@@ -82,7 +82,7 @@ func (am *DefaultAccountManager) CreateNameServerGroup(ctx context.Context, acco
am.StoreEvent(ctx, userID, newNSGroup.ID, accountID, activity.NameserverGroupCreated, newNSGroup.EventMeta())
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceNameServerGroup, Operation: types.UpdateOperationCreate})
}
return newNSGroup.Copy(), nil
@@ -133,7 +133,7 @@ func (am *DefaultAccountManager) SaveNameServerGroup(ctx context.Context, accoun
am.StoreEvent(ctx, userID, nsGroupToSave.ID, accountID, activity.NameserverGroupUpdated, nsGroupToSave.EventMeta())
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceNameServerGroup, Operation: types.UpdateOperationUpdate})
}
return nil
@@ -176,7 +176,7 @@ func (am *DefaultAccountManager) DeleteNameServerGroup(ctx context.Context, acco
am.StoreEvent(ctx, userID, nsGroup.ID, accountID, activity.NameserverGroupDeleted, nsGroup.EventMeta())
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceNameServerGroup, Operation: types.UpdateOperationDelete})
}
return nil

View File

@@ -15,6 +15,7 @@ import (
"github.com/netbirdio/netbird/management/server/permissions/modules"
"github.com/netbirdio/netbird/management/server/permissions/operations"
"github.com/netbirdio/netbird/management/server/store"
serverTypes "github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/status"
)
@@ -177,7 +178,7 @@ func (m *managerImpl) DeleteNetwork(ctx context.Context, accountID, userID, netw
event()
}
go m.accountManager.UpdateAccountPeers(ctx, accountID)
go m.accountManager.UpdateAccountPeers(ctx, accountID, serverTypes.UpdateReason{Resource: serverTypes.UpdateResourceNetwork, Operation: serverTypes.UpdateOperationDelete})
return nil
}

View File

@@ -162,7 +162,7 @@ func (m *managerImpl) CreateResource(ctx context.Context, userID string, resourc
event()
}
go m.accountManager.UpdateAccountPeers(ctx, resource.AccountID)
go m.accountManager.UpdateAccountPeers(ctx, resource.AccountID, nbtypes.UpdateReason{Resource: nbtypes.UpdateResourceNetworkResource, Operation: nbtypes.UpdateOperationCreate})
return resource, nil
}
@@ -270,7 +270,7 @@ func (m *managerImpl) UpdateResource(ctx context.Context, userID string, resourc
}
}()
go m.accountManager.UpdateAccountPeers(ctx, resource.AccountID)
go m.accountManager.UpdateAccountPeers(ctx, resource.AccountID, nbtypes.UpdateReason{Resource: nbtypes.UpdateResourceNetworkResource, Operation: nbtypes.UpdateOperationUpdate})
return resource, nil
}
@@ -352,7 +352,7 @@ func (m *managerImpl) DeleteResource(ctx context.Context, accountID, userID, net
event()
}
go m.accountManager.UpdateAccountPeers(ctx, accountID)
go m.accountManager.UpdateAccountPeers(ctx, accountID, nbtypes.UpdateReason{Resource: nbtypes.UpdateResourceNetworkResource, Operation: nbtypes.UpdateOperationDelete})
return nil
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/netbirdio/netbird/management/server/permissions/modules"
"github.com/netbirdio/netbird/management/server/permissions/operations"
"github.com/netbirdio/netbird/management/server/store"
serverTypes "github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/status"
)
@@ -119,7 +120,7 @@ func (m *managerImpl) CreateRouter(ctx context.Context, userID string, router *t
m.accountManager.StoreEvent(ctx, userID, router.ID, router.AccountID, activity.NetworkRouterCreated, router.EventMeta(network))
go m.accountManager.UpdateAccountPeers(ctx, router.AccountID)
go m.accountManager.UpdateAccountPeers(ctx, router.AccountID, serverTypes.UpdateReason{Resource: serverTypes.UpdateResourceNetworkRouter, Operation: serverTypes.UpdateOperationCreate})
return router, nil
}
@@ -183,7 +184,7 @@ func (m *managerImpl) UpdateRouter(ctx context.Context, userID string, router *t
m.accountManager.StoreEvent(ctx, userID, router.ID, router.AccountID, activity.NetworkRouterUpdated, router.EventMeta(network))
go m.accountManager.UpdateAccountPeers(ctx, router.AccountID)
go m.accountManager.UpdateAccountPeers(ctx, router.AccountID, serverTypes.UpdateReason{Resource: serverTypes.UpdateResourceNetworkRouter, Operation: serverTypes.UpdateOperationUpdate})
return router, nil
}
@@ -217,7 +218,7 @@ func (m *managerImpl) DeleteRouter(ctx context.Context, accountID, userID, netwo
event()
go m.accountManager.UpdateAccountPeers(ctx, accountID)
go m.accountManager.UpdateAccountPeers(ctx, accountID, serverTypes.UpdateReason{Resource: serverTypes.UpdateResourceNetworkRouter, Operation: serverTypes.UpdateOperationDelete})
return nil
}

View File

@@ -1221,12 +1221,12 @@ func (am *DefaultAccountManager) GetPeer(ctx context.Context, accountID, peerID,
// UpdateAccountPeers updates all peers that belong to an account.
// Should be called when changes have to be synced to peers.
func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) {
_ = am.networkMapController.UpdateAccountPeers(ctx, accountID)
func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) {
_ = am.networkMapController.UpdateAccountPeers(ctx, accountID, reason)
}
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
_ = am.networkMapController.BufferUpdateAccountPeers(ctx, accountID)
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string, reason types.UpdateReason) {
_ = am.networkMapController.BufferUpdateAccountPeers(ctx, accountID, reason)
}
// UpdateAccountPeer updates a single peer that belongs to an account.

View File

@@ -975,7 +975,7 @@ func BenchmarkUpdateAccountPeers(b *testing.B) {
start := time.Now()
for i := 0; i < b.N; i++ {
manager.UpdateAccountPeers(ctx, account.Id)
manager.UpdateAccountPeers(ctx, account.Id, types.UpdateReason{})
}
duration := time.Since(start)
@@ -1033,7 +1033,7 @@ func testUpdateAccountPeers(t *testing.T) {
peerChannels[peerID] = updateManager.CreateChannel(ctx, peerID)
}
manager.UpdateAccountPeers(ctx, account.Id)
manager.UpdateAccountPeers(ctx, account.Id, types.UpdateReason{})
for _, channel := range peerChannels {
update := <-channel

View File

@@ -96,7 +96,11 @@ func (am *DefaultAccountManager) SavePolicy(ctx context.Context, accountID, user
am.StoreEvent(ctx, userID, policy.ID, accountID, action, policy.EventMeta())
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
policyOp := types.UpdateOperationCreate
if isUpdate {
policyOp = types.UpdateOperationUpdate
}
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourcePolicy, Operation: policyOp})
}
return policy, nil
@@ -139,7 +143,7 @@ func (am *DefaultAccountManager) DeletePolicy(ctx context.Context, accountID, po
am.StoreEvent(ctx, userID, policyID, accountID, activity.PolicyRemoved, policy.EventMeta())
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourcePolicy, Operation: types.UpdateOperationDelete})
}
return nil

View File

@@ -11,6 +11,7 @@ import (
"github.com/netbirdio/netbird/management/server/permissions/operations"
"github.com/netbirdio/netbird/management/server/posture"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/status"
)
@@ -76,7 +77,11 @@ func (am *DefaultAccountManager) SavePostureChecks(ctx context.Context, accountI
am.StoreEvent(ctx, userID, postureChecks.ID, accountID, action, postureChecks.EventMeta())
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
postureOp := types.UpdateOperationCreate
if isUpdate {
postureOp = types.UpdateOperationUpdate
}
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourcePostureCheck, Operation: postureOp})
}
return postureChecks, nil

View File

@@ -191,7 +191,7 @@ func (am *DefaultAccountManager) CreateRoute(ctx context.Context, accountID stri
am.StoreEvent(ctx, userID, string(newRoute.ID), accountID, activity.RouteCreated, newRoute.EventMeta())
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceRoute, Operation: types.UpdateOperationCreate})
}
return newRoute, nil
@@ -245,7 +245,7 @@ func (am *DefaultAccountManager) SaveRoute(ctx context.Context, accountID, userI
am.StoreEvent(ctx, userID, string(routeToSave.ID), accountID, activity.RouteUpdated, routeToSave.EventMeta())
if oldRouteAffectsPeers || newRouteAffectsPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceRoute, Operation: types.UpdateOperationUpdate})
}
return nil
@@ -288,7 +288,7 @@ func (am *DefaultAccountManager) DeleteRoute(ctx context.Context, accountID stri
am.StoreEvent(ctx, userID, string(route.ID), accountID, activity.RouteRemoved, route.EventMeta())
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceRoute, Operation: types.UpdateOperationDelete})
}
return nil

View File

@@ -4,6 +4,7 @@ import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
@@ -11,6 +12,7 @@ import (
type AccountManagerMetrics struct {
ctx context.Context
updateAccountPeersDurationMs metric.Float64Histogram
updateAccountPeersCounter metric.Int64Counter
getPeerNetworkMapDurationMs metric.Float64Histogram
networkMapObjectCount metric.Int64Histogram
peerMetaUpdateCount metric.Int64Counter
@@ -48,6 +50,13 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account
return nil, err
}
updateAccountPeersCounter, err := meter.Int64Counter("management.account.update.account.peers.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of account peers updates triggered, labeled by resource and operation"))
if err != nil {
return nil, err
}
peerMetaUpdateCount, err := meter.Int64Counter("management.account.peer.meta.update.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates with new meta data from the peers"))
@@ -59,6 +68,7 @@ func NewAccountManagerMetrics(ctx context.Context, meter metric.Meter) (*Account
ctx: ctx,
getPeerNetworkMapDurationMs: getPeerNetworkMapDurationMs,
updateAccountPeersDurationMs: updateAccountPeersDurationMs,
updateAccountPeersCounter: updateAccountPeersCounter,
networkMapObjectCount: networkMapObjectCount,
peerMetaUpdateCount: peerMetaUpdateCount,
}, nil
@@ -80,6 +90,16 @@ func (metrics *AccountManagerMetrics) CountNetworkMapObjects(count int64) {
metrics.networkMapObjectCount.Record(metrics.ctx, count)
}
// CountUpdateAccountPeersTriggered increments the counter for account peers updates with resource and operation labels.
func (metrics *AccountManagerMetrics) CountUpdateAccountPeersTriggered(resource, operation string) {
metrics.updateAccountPeersCounter.Add(metrics.ctx, 1,
metric.WithAttributes(
attribute.String("resource", resource),
attribute.String("operation", operation),
),
)
}
// CountPeerMetUpdate counts the number of peer meta updates
func (metrics *AccountManagerMetrics) CountPeerMetUpdate() {
metrics.peerMetaUpdateCount.Add(metrics.ctx, 1)

View File

@@ -0,0 +1,37 @@
package types
// UpdateReason describes why an account peers update was triggered.
type UpdateReason struct {
Resource UpdateResource
Operation UpdateOperation
}
// UpdateResource represents the kind of resource that triggered an account peers update.
type UpdateResource string
const (
UpdateResourceAccountSettings UpdateResource = "account_settings"
UpdateResourceDNSSettings UpdateResource = "dns_settings"
UpdateResourceGroup UpdateResource = "group"
UpdateResourceNameServerGroup UpdateResource = "nameserver_group"
UpdateResourcePolicy UpdateResource = "policy"
UpdateResourcePostureCheck UpdateResource = "posture_check"
UpdateResourceRoute UpdateResource = "route"
UpdateResourceUser UpdateResource = "user"
UpdateResourcePeer UpdateResource = "peer"
UpdateResourceNetwork UpdateResource = "network"
UpdateResourceNetworkResource UpdateResource = "network_resource"
UpdateResourceNetworkRouter UpdateResource = "network_router"
UpdateResourceService UpdateResource = "service"
UpdateResourceZone UpdateResource = "zone"
UpdateResourceZoneRecord UpdateResource = "zone_record"
)
// UpdateOperation represents the kind of change that triggered the update.
type UpdateOperation string
const (
UpdateOperationCreate UpdateOperation = "create"
UpdateOperationUpdate UpdateOperation = "update"
UpdateOperationDelete UpdateOperation = "delete"
)

View File

@@ -675,7 +675,7 @@ func (am *DefaultAccountManager) SaveOrAddUsers(ctx context.Context, accountID,
if err = am.Store.IncrementNetworkSerial(ctx, accountID); err != nil {
return nil, fmt.Errorf("failed to increment network serial: %w", err)
}
am.UpdateAccountPeers(ctx, accountID)
am.UpdateAccountPeers(ctx, accountID, types.UpdateReason{Resource: types.UpdateResourceUser, Operation: types.UpdateOperationUpdate})
}
return updatedUsersInfo, globalErr