mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-16 07:16:38 +00:00
[management] Remove all store locks from grpc side (#4374)
This commit is contained in:
@@ -1639,11 +1639,6 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID
|
|||||||
log.WithContext(ctx).Debugf("SyncAndMarkPeer: took %v", time.Since(start))
|
log.WithContext(ctx).Debugf("SyncAndMarkPeer: took %v", time.Since(start))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
accountUnlock := am.Store.AcquireReadLockByUID(ctx, accountID)
|
|
||||||
defer accountUnlock()
|
|
||||||
peerUnlock := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
|
|
||||||
defer peerUnlock()
|
|
||||||
|
|
||||||
peer, netMap, postureChecks, err := am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, accountID)
|
peer, netMap, postureChecks, err := am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, accountID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, fmt.Errorf("error syncing peer: %w", err)
|
return nil, nil, nil, fmt.Errorf("error syncing peer: %w", err)
|
||||||
@@ -1658,18 +1653,12 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string) error {
|
func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string) error {
|
||||||
accountUnlock := am.Store.AcquireReadLockByUID(ctx, accountID)
|
|
||||||
defer accountUnlock()
|
|
||||||
peerUnlock := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
|
|
||||||
defer peerUnlock()
|
|
||||||
|
|
||||||
err := am.MarkPeerConnected(ctx, peerPubKey, false, nil, accountID)
|
err := am.MarkPeerConnected(ctx, peerPubKey, false, nil, accountID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithContext(ctx).Warnf("failed marking peer as disconnected %s %v", peerPubKey, err)
|
log.WithContext(ctx).Warnf("failed marking peer as disconnected %s %v", peerPubKey, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (am *DefaultAccountManager) SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta) error {
|
func (am *DefaultAccountManager) SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta) error {
|
||||||
@@ -1678,12 +1667,6 @@ func (am *DefaultAccountManager) SyncPeerMeta(ctx context.Context, peerPubKey st
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
unlock := am.Store.AcquireReadLockByUID(ctx, accountID)
|
|
||||||
defer unlock()
|
|
||||||
|
|
||||||
unlockPeer := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
|
|
||||||
defer unlockPeer()
|
|
||||||
|
|
||||||
_, _, _, err = am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta, UpdateAccountPeers: true}, accountID)
|
_, _, _, err = am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta, UpdateAccountPeers: true}, accountID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return mapError(ctx, err)
|
return mapError(ctx, err)
|
||||||
|
|||||||
@@ -609,13 +609,6 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
newPeer.DNSLabel = freeLabel
|
newPeer.DNSLabel = freeLabel
|
||||||
newPeer.IP = freeIP
|
newPeer.IP = freeIP
|
||||||
|
|
||||||
unlock := am.Store.AcquireReadLockByUID(ctx, accountID)
|
|
||||||
defer func() {
|
|
||||||
if unlock != nil {
|
|
||||||
unlock()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||||
err = transaction.AddPeerToAccount(ctx, newPeer)
|
err = transaction.AddPeerToAccount(ctx, newPeer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -667,14 +660,10 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
unlock()
|
|
||||||
unlock = nil
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if isUniqueConstraintError(err) {
|
if isUniqueConstraintError(err) {
|
||||||
unlock()
|
|
||||||
unlock = nil
|
|
||||||
log.WithContext(ctx).WithFields(log.Fields{"dns_label": freeLabel, "ip": freeIP}).Tracef("Failed to add peer in attempt %d, retrying: %v", attempt, err)
|
log.WithContext(ctx).WithFields(log.Fields{"dns_label": freeLabel, "ip": freeIP}).Tracef("Failed to add peer in attempt %d, retrying: %v", attempt, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -833,15 +822,6 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
unlockAccount := am.Store.AcquireReadLockByUID(ctx, accountID)
|
|
||||||
defer unlockAccount()
|
|
||||||
unlockPeer := am.Store.AcquireWriteLockByUID(ctx, login.WireGuardPubKey)
|
|
||||||
defer func() {
|
|
||||||
if unlockPeer != nil {
|
|
||||||
unlockPeer()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
var peer *nbpeer.Peer
|
var peer *nbpeer.Peer
|
||||||
var updateRemotePeers bool
|
var updateRemotePeers bool
|
||||||
var isRequiresApproval bool
|
var isRequiresApproval bool
|
||||||
@@ -922,9 +902,6 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
|
|||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
unlockPeer()
|
|
||||||
unlockPeer = nil
|
|
||||||
|
|
||||||
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
|
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
|
||||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -163,25 +163,6 @@ func (s *SqlStore) AcquireWriteLockByUID(ctx context.Context, uniqueID string) (
|
|||||||
return unlock
|
return unlock
|
||||||
}
|
}
|
||||||
|
|
||||||
// AcquireReadLockByUID acquires an ID lock for writing to a resource and returns a function that releases the lock
|
|
||||||
func (s *SqlStore) AcquireReadLockByUID(ctx context.Context, uniqueID string) (unlock func()) {
|
|
||||||
log.WithContext(ctx).Tracef("acquiring read lock for ID %s", uniqueID)
|
|
||||||
|
|
||||||
startWait := time.Now()
|
|
||||||
value, _ := s.resourceLocks.LoadOrStore(uniqueID, &sync.RWMutex{})
|
|
||||||
mtx := value.(*sync.RWMutex)
|
|
||||||
mtx.RLock()
|
|
||||||
log.WithContext(ctx).Tracef("waiting to acquire read lock for ID %s in %v", uniqueID, time.Since(startWait))
|
|
||||||
startHold := time.Now()
|
|
||||||
|
|
||||||
unlock = func() {
|
|
||||||
mtx.RUnlock()
|
|
||||||
log.WithContext(ctx).Tracef("released read lock for ID %s in %v", uniqueID, time.Since(startHold))
|
|
||||||
}
|
|
||||||
|
|
||||||
return unlock
|
|
||||||
}
|
|
||||||
|
|
||||||
// Deprecated: Full account operations are no longer supported
|
// Deprecated: Full account operations are no longer supported
|
||||||
func (s *SqlStore) SaveAccount(ctx context.Context, account *types.Account) error {
|
func (s *SqlStore) SaveAccount(ctx context.Context, account *types.Account) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|||||||
@@ -170,8 +170,6 @@ type Store interface {
|
|||||||
|
|
||||||
// AcquireWriteLockByUID should attempt to acquire a lock for write purposes and return a function that releases the lock
|
// AcquireWriteLockByUID should attempt to acquire a lock for write purposes and return a function that releases the lock
|
||||||
AcquireWriteLockByUID(ctx context.Context, uniqueID string) func()
|
AcquireWriteLockByUID(ctx context.Context, uniqueID string) func()
|
||||||
// AcquireReadLockByUID should attempt to acquire lock for read purposes and return a function that releases the lock
|
|
||||||
AcquireReadLockByUID(ctx context.Context, uniqueID string) func()
|
|
||||||
// AcquireGlobalLock should attempt to acquire a global lock and return a function that releases the lock
|
// AcquireGlobalLock should attempt to acquire a global lock and return a function that releases the lock
|
||||||
AcquireGlobalLock(ctx context.Context) func()
|
AcquireGlobalLock(ctx context.Context) func()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user