diff --git a/management/internals/controllers/network_map/controller/controller.go b/management/internals/controllers/network_map/controller/controller.go index 5cbf6fceb..f13eafbcf 100644 --- a/management/internals/controllers/network_map/controller/controller.go +++ b/management/internals/controllers/network_map/controller/controller.go @@ -45,6 +45,7 @@ type Controller struct { accountUpdateLocks sync.Map sendAccountUpdateLocks sync.Map + affectedPeerUpdateLocks sync.Map updateAccountPeersBufferInterval atomic.Int64 // dnsDomain is used for peer resolution. This is appended to the peer's name dnsDomain string @@ -63,6 +64,13 @@ type bufferUpdate struct { update atomic.Bool } +type bufferAffectedUpdate struct { + sendMu sync.Mutex + dataMu sync.Mutex + next *time.Timer + peerIDs map[string]struct{} +} + var _ network_map.Controller = (*Controller)(nil) func NewController(ctx context.Context, store store.Store, metrics telemetry.AppMetrics, peersUpdateManager network_map.PeersUpdateManager, requestBuffer account.RequestBuffer, integratedPeerValidator integrated_validator.IntegratedValidator, settingsManager settings.Manager, dnsDomain string, proxyController port_forwarding.Controller, ephemeralPeersManager ephemeral.Manager, config *config.Config) *Controller { @@ -496,6 +504,98 @@ func (c *Controller) BufferUpdateAccountPeers(ctx context.Context, accountID str return nil } +// BufferUpdateAffectedPeers accumulates peer IDs across rapid successive calls +// and flushes them in a single sendUpdateForAffectedPeers call after the buffer interval. +func (c *Controller) BufferUpdateAffectedPeers(ctx context.Context, accountID string, peerIDs []string) error { + if len(peerIDs) == 0 { + return nil + } + + log.WithContext(ctx).Tracef("buffer updating %d affected peers for account %s from %s", len(peerIDs), accountID, util.GetCallerName()) + + bufUpd, _ := c.affectedPeerUpdateLocks.LoadOrStore(accountID, &bufferAffectedUpdate{ + peerIDs: make(map[string]struct{}), + }) + b := bufUpd.(*bufferAffectedUpdate) + + // Always accumulate incoming peer IDs (non-blocking). + b.addPeerIDs(peerIDs) + + if !b.sendMu.TryLock() { + // Another goroutine is already sending; it will pick up our IDs on its next drain. + return nil + } + + b.stopTimer() + + collected := b.drainPeerIDs() + go func() { + defer b.sendMu.Unlock() + _ = c.sendUpdateForAffectedPeers(ctx, accountID, collected) + + // Check if more peer IDs accumulated while we were sending. + if !b.hasPending() { + return + } + + // Schedule a debounced flush for the newly accumulated IDs. + b.setTimer(time.Duration(c.updateAccountPeersBufferInterval.Load()), func() { + ids := b.drainPeerIDs() + if len(ids) > 0 { + _ = c.sendUpdateForAffectedPeers(ctx, accountID, ids) + } + }) + }() + + return nil +} + +func (b *bufferAffectedUpdate) addPeerIDs(ids []string) { + b.dataMu.Lock() + for _, id := range ids { + b.peerIDs[id] = struct{}{} + } + b.dataMu.Unlock() +} + +func (b *bufferAffectedUpdate) drainPeerIDs() []string { + b.dataMu.Lock() + defer b.dataMu.Unlock() + if len(b.peerIDs) == 0 { + return nil + } + ids := make([]string, 0, len(b.peerIDs)) + for id := range b.peerIDs { + ids = append(ids, id) + } + b.peerIDs = make(map[string]struct{}) + return ids +} + +func (b *bufferAffectedUpdate) hasPending() bool { + b.dataMu.Lock() + defer b.dataMu.Unlock() + return len(b.peerIDs) > 0 +} + +func (b *bufferAffectedUpdate) stopTimer() { + b.dataMu.Lock() + defer b.dataMu.Unlock() + if b.next != nil { + b.next.Stop() + } +} + +func (b *bufferAffectedUpdate) setTimer(d time.Duration, f func()) { + b.dataMu.Lock() + defer b.dataMu.Unlock() + if b.next == nil { + b.next = time.AfterFunc(d, f) + return + } + b.next.Reset(d) +} + func (c *Controller) GetValidatedPeerWithMap(ctx context.Context, isRequiresApproval bool, accountID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) { if isRequiresApproval { network, err := c.repo.GetAccountNetwork(ctx, accountID) diff --git a/management/internals/controllers/network_map/interface.go b/management/internals/controllers/network_map/interface.go index 8d81556f9..4b9fdee12 100644 --- a/management/internals/controllers/network_map/interface.go +++ b/management/internals/controllers/network_map/interface.go @@ -20,6 +20,7 @@ const ( type Controller interface { UpdateAccountPeers(ctx context.Context, accountID string) error UpdateAffectedPeers(ctx context.Context, accountID string, peerIDs []string) error + BufferUpdateAffectedPeers(ctx context.Context, accountID string, peerIDs []string) error UpdateAccountPeer(ctx context.Context, accountId string, peerId string) error BufferUpdateAccountPeers(ctx context.Context, accountID string) error GetValidatedPeerWithMap(ctx context.Context, isRequiresApproval bool, accountID string, p *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) diff --git a/management/internals/controllers/network_map/interface_mock.go b/management/internals/controllers/network_map/interface_mock.go index b2ef0b861..15b6bdc56 100644 --- a/management/internals/controllers/network_map/interface_mock.go +++ b/management/internals/controllers/network_map/interface_mock.go @@ -57,6 +57,20 @@ func (mr *MockControllerMockRecorder) BufferUpdateAccountPeers(ctx, accountID an return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BufferUpdateAccountPeers", reflect.TypeOf((*MockController)(nil).BufferUpdateAccountPeers), ctx, accountID) } +// BufferUpdateAffectedPeers mocks base method. +func (m *MockController) BufferUpdateAffectedPeers(ctx context.Context, accountID string, peerIDs []string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BufferUpdateAffectedPeers", ctx, accountID, peerIDs) + ret0, _ := ret[0].(error) + return ret0 +} + +// BufferUpdateAffectedPeers indicates an expected call of BufferUpdateAffectedPeers. +func (mr *MockControllerMockRecorder) BufferUpdateAffectedPeers(ctx, accountID, peerIDs any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BufferUpdateAffectedPeers", reflect.TypeOf((*MockController)(nil).BufferUpdateAffectedPeers), ctx, accountID, peerIDs) +} + // CountStreams mocks base method. func (m *MockController) CountStreams() int { m.ctrl.T.Helper() diff --git a/management/server/account/manager.go b/management/server/account/manager.go index 576054d1e..9b4a8152c 100644 --- a/management/server/account/manager.go +++ b/management/server/account/manager.go @@ -126,6 +126,7 @@ type Manager interface { DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error UpdateAccountPeers(ctx context.Context, accountID string) UpdateAffectedPeers(ctx context.Context, accountID string, peerIDs []string) + BufferUpdateAffectedPeers(ctx context.Context, accountID string, peerIDs []string) BufferUpdateAccountPeers(ctx context.Context, accountID string) BuildUserInfosForAccount(ctx context.Context, accountID, initiatorUserID string, accountUsers []*types.User) (map[string]*types.UserInfo, error) SyncUserJWTGroups(ctx context.Context, userAuth auth.UserAuth) error diff --git a/management/server/account/manager_mock.go b/management/server/account/manager_mock.go index c595346f8..0896255ac 100644 --- a/management/server/account/manager_mock.go +++ b/management/server/account/manager_mock.go @@ -122,6 +122,18 @@ func (mr *MockManagerMockRecorder) BufferUpdateAccountPeers(ctx, accountID inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BufferUpdateAccountPeers", reflect.TypeOf((*MockManager)(nil).BufferUpdateAccountPeers), ctx, accountID) } +// BufferUpdateAffectedPeers mocks base method. +func (m *MockManager) BufferUpdateAffectedPeers(ctx context.Context, accountID string, peerIDs []string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BufferUpdateAffectedPeers", ctx, accountID, peerIDs) +} + +// BufferUpdateAffectedPeers indicates an expected call of BufferUpdateAffectedPeers. +func (mr *MockManagerMockRecorder) BufferUpdateAffectedPeers(ctx, accountID, peerIDs interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BufferUpdateAffectedPeers", reflect.TypeOf((*MockManager)(nil).BufferUpdateAffectedPeers), ctx, accountID, peerIDs) +} + // BuildUserInfosForAccount mocks base method. func (m *MockManager) BuildUserInfosForAccount(ctx context.Context, accountID, initiatorUserID string, accountUsers []*types.User) (map[string]*types.UserInfo, error) { m.ctrl.T.Helper() diff --git a/management/server/mock_server/account_mock.go b/management/server/mock_server/account_mock.go index 5a9009da7..d4580a4c6 100644 --- a/management/server/mock_server/account_mock.go +++ b/management/server/mock_server/account_mock.go @@ -130,6 +130,7 @@ type MockAccountManager struct { AllowSyncFunc func(string, uint64) bool UpdateAccountPeersFunc func(ctx context.Context, accountID string) UpdateAffectedPeersFunc func(ctx context.Context, accountID string, peerIDs []string) + BufferUpdateAffectedPeersFunc func(ctx context.Context, accountID string, peerIDs []string) BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string) RecalculateNetworkMapCacheFunc func(ctx context.Context, accountId string) error @@ -213,6 +214,12 @@ func (am *MockAccountManager) UpdateAffectedPeers(ctx context.Context, accountID } } +func (am *MockAccountManager) BufferUpdateAffectedPeers(ctx context.Context, accountID string, peerIDs []string) { + if am.BufferUpdateAffectedPeersFunc != nil { + am.BufferUpdateAffectedPeersFunc(ctx, accountID, peerIDs) + } +} + func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) { if am.BufferUpdateAccountPeersFunc != nil { am.BufferUpdateAccountPeersFunc(ctx, accountID) diff --git a/management/server/peer.go b/management/server/peer.go index 39368c840..509d53ebb 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -1326,6 +1326,12 @@ func (am *DefaultAccountManager) resolvePeerIDs(ctx context.Context, s store.Sto return peerIDs } +// BufferUpdateAffectedPeers accumulates peer IDs across rapid successive calls +// and flushes them in a single update after the buffer interval. +func (am *DefaultAccountManager) BufferUpdateAffectedPeers(ctx context.Context, accountID string, peerIDs []string) { + _ = am.networkMapController.BufferUpdateAffectedPeers(ctx, accountID, peerIDs) +} + func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) { _ = am.networkMapController.BufferUpdateAccountPeers(ctx, accountID) }