From 2d82d2fb83b664bf50b89e83ef249a7d7014cff1 Mon Sep 17 00:00:00 2001 From: Pascal Fischer Date: Thu, 13 Mar 2025 14:58:47 +0100 Subject: [PATCH] add buffer for update account peers --- management/server/account.go | 7 ++++--- management/server/peer.go | 30 ++++++++++++++++++++++++------ 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/management/server/account.go b/management/server/account.go index b7574f7c1..e32964bfb 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -68,7 +68,8 @@ type DefaultAccountManager struct { eventStore activity.Store geo geolocation.Geolocation - requestBuffer *AccountRequestBuffer + accountUpdateLocks sync.Map + requestBuffer *AccountRequestBuffer proxyController port_forwarding.Controller settingsManager settings.Manager @@ -1222,7 +1223,7 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth if removedGroupAffectsPeers || newGroupsAffectsPeers { log.WithContext(ctx).Tracef("user %s: JWT group membership changed, updating account peers", userAuth.UserId) - am.UpdateAccountPeers(ctx, userAuth.AccountId) + am.BufferUpdateAccountPeers(ctx, userAuth.AccountId) } } @@ -1461,7 +1462,7 @@ func (am *DefaultAccountManager) GetDNSDomain() string { func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) { log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID) - am.UpdateAccountPeers(ctx, accountID) + am.BufferUpdateAccountPeers(ctx, accountID) } func (am *DefaultAccountManager) FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error) { diff --git a/management/server/peer.go b/management/server/peer.go index 4e70fe6e3..5b9eb8994 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -135,7 +135,7 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK if expired { // we need to update other peers because when peer login expires all other peers are notified to disconnect from // the expired one. Here we notify them that connection is now allowed again. - am.UpdateAccountPeers(ctx, accountID) + am.BufferUpdateAccountPeers(ctx, accountID) } return nil @@ -302,7 +302,7 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user } if peerLabelChanged || requiresPeerUpdates { - am.UpdateAccountPeers(ctx, accountID) + am.BufferUpdateAccountPeers(ctx, accountID) } else if sshChanged { am.UpdateAccountPeer(ctx, accountID, peer.ID) } @@ -383,7 +383,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer } if updateAccountPeers { - am.UpdateAccountPeers(ctx, accountID) + am.BufferUpdateAccountPeers(ctx, accountID) } return nil @@ -653,7 +653,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s unlock = nil if updateAccountPeers { - am.UpdateAccountPeers(ctx, accountID) + am.BufferUpdateAccountPeers(ctx, accountID) } return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer) @@ -748,7 +748,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy } if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) { - am.UpdateAccountPeers(ctx, accountID) + am.BufferUpdateAccountPeers(ctx, accountID) } return am.getValidatedPeerWithMap(ctx, peerNotValid, accountID, peer) @@ -893,7 +893,7 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer unlockPeer = nil if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) { - am.UpdateAccountPeers(ctx, accountID) + am.BufferUpdateAccountPeers(ctx, accountID) } return am.getValidatedPeerWithMap(ctx, isRequiresApproval, accountID, peer) @@ -1226,6 +1226,24 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account } } +func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) { + mu, _ := am.accountUpdateLocks.LoadOrStore(accountID, &sync.Mutex{}) + lock := mu.(*sync.Mutex) + + if !lock.TryLock() { + return + } + + go func() { + + time.Sleep(1 * time.Second) + am.UpdateAccountPeers(ctx, accountID) + + am.accountUpdateLocks.Delete(accountID) + lock.Unlock() + }() +} + // UpdateAccountPeer updates a single peer that belongs to an account. // Should be called when changes need to be synced to a specific peer only. func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountId string, peerId string) {