add buffer for update account peers

This commit is contained in:
Pascal Fischer
2025-03-13 14:58:47 +01:00
parent ab2e3fec72
commit 2d82d2fb83
2 changed files with 28 additions and 9 deletions

View File

@@ -68,7 +68,8 @@ type DefaultAccountManager struct {
eventStore activity.Store
geo geolocation.Geolocation
requestBuffer *AccountRequestBuffer
accountUpdateLocks sync.Map
requestBuffer *AccountRequestBuffer
proxyController port_forwarding.Controller
settingsManager settings.Manager
@@ -1222,7 +1223,7 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth
if removedGroupAffectsPeers || newGroupsAffectsPeers {
log.WithContext(ctx).Tracef("user %s: JWT group membership changed, updating account peers", userAuth.UserId)
am.UpdateAccountPeers(ctx, userAuth.AccountId)
am.BufferUpdateAccountPeers(ctx, userAuth.AccountId)
}
}
@@ -1461,7 +1462,7 @@ func (am *DefaultAccountManager) GetDNSDomain() string {
func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) {
log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID)
am.UpdateAccountPeers(ctx, accountID)
am.BufferUpdateAccountPeers(ctx, accountID)
}
func (am *DefaultAccountManager) FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error) {

View File

@@ -135,7 +135,7 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
if expired {
// we need to update other peers because when peer login expires all other peers are notified to disconnect from
// the expired one. Here we notify them that connection is now allowed again.
am.UpdateAccountPeers(ctx, accountID)
am.BufferUpdateAccountPeers(ctx, accountID)
}
return nil
@@ -302,7 +302,7 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
}
if peerLabelChanged || requiresPeerUpdates {
am.UpdateAccountPeers(ctx, accountID)
am.BufferUpdateAccountPeers(ctx, accountID)
} else if sshChanged {
am.UpdateAccountPeer(ctx, accountID, peer.ID)
}
@@ -383,7 +383,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
}
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.BufferUpdateAccountPeers(ctx, accountID)
}
return nil
@@ -653,7 +653,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
unlock = nil
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.BufferUpdateAccountPeers(ctx, accountID)
}
return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer)
@@ -748,7 +748,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
}
if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) {
am.UpdateAccountPeers(ctx, accountID)
am.BufferUpdateAccountPeers(ctx, accountID)
}
return am.getValidatedPeerWithMap(ctx, peerNotValid, accountID, peer)
@@ -893,7 +893,7 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
unlockPeer = nil
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
am.UpdateAccountPeers(ctx, accountID)
am.BufferUpdateAccountPeers(ctx, accountID)
}
return am.getValidatedPeerWithMap(ctx, isRequiresApproval, accountID, peer)
@@ -1226,6 +1226,24 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
}
}
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
mu, _ := am.accountUpdateLocks.LoadOrStore(accountID, &sync.Mutex{})
lock := mu.(*sync.Mutex)
if !lock.TryLock() {
return
}
go func() {
time.Sleep(1 * time.Second)
am.UpdateAccountPeers(ctx, accountID)
am.accountUpdateLocks.Delete(accountID)
lock.Unlock()
}()
}
// UpdateAccountPeer updates a single peer that belongs to an account.
// Should be called when changes need to be synced to a specific peer only.
func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountId string, peerId string) {