mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-16 07:16:38 +00:00
[management] Fix/delete groups without lock (#5012)
This commit is contained in:
@@ -276,6 +276,8 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
||||
unlock()
|
||||
unlock = nil
|
||||
|
||||
log.WithContext(ctx).Debugf("Sync took %s", time.Since(reqStart))
|
||||
|
||||
s.syncSem.Add(-1)
|
||||
|
||||
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv)
|
||||
@@ -565,6 +567,7 @@ func (s *Server) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart), accountID)
|
||||
}
|
||||
log.WithContext(ctx).Debugf("Login took %s", time.Since(reqStart))
|
||||
}()
|
||||
|
||||
if loginReq.GetMeta() == nil {
|
||||
|
||||
@@ -427,7 +427,7 @@ func (am *DefaultAccountManager) DeleteGroups(ctx context.Context, accountID, us
|
||||
|
||||
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||
for _, groupID := range groupIDs {
|
||||
group, err := transaction.GetGroupByID(ctx, store.LockingStrengthUpdate, accountID, groupID)
|
||||
group, err := transaction.GetGroupByID(ctx, store.LockingStrengthNone, accountID, groupID)
|
||||
if err != nil {
|
||||
allErrors = errors.Join(allErrors, err)
|
||||
continue
|
||||
@@ -442,6 +442,10 @@ func (am *DefaultAccountManager) DeleteGroups(ctx context.Context, accountID, us
|
||||
deletedGroups = append(deletedGroups, group)
|
||||
}
|
||||
|
||||
if len(groupIDsToDelete) == 0 {
|
||||
return allErrors
|
||||
}
|
||||
|
||||
if err = transaction.DeleteGroups(ctx, accountID, groupIDsToDelete); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -63,6 +63,8 @@ type SqlStore struct {
|
||||
installationPK int
|
||||
storeEngine types.Engine
|
||||
pool *pgxpool.Pool
|
||||
|
||||
transactionTimeout time.Duration
|
||||
}
|
||||
|
||||
type installation struct {
|
||||
@@ -84,6 +86,14 @@ func NewSqlStore(ctx context.Context, db *gorm.DB, storeEngine types.Engine, met
|
||||
conns = runtime.NumCPU()
|
||||
}
|
||||
|
||||
transactionTimeout := 5 * time.Minute
|
||||
if v := os.Getenv("NB_STORE_TRANSACTION_TIMEOUT"); v != "" {
|
||||
if parsed, err := time.ParseDuration(v); err == nil {
|
||||
transactionTimeout = parsed
|
||||
}
|
||||
}
|
||||
log.WithContext(ctx).Infof("Setting transaction timeout to %v", transactionTimeout)
|
||||
|
||||
if storeEngine == types.SqliteStoreEngine {
|
||||
if err == nil {
|
||||
log.WithContext(ctx).Warnf("setting NB_SQL_MAX_OPEN_CONNS is not supported for sqlite, using default value 1")
|
||||
@@ -101,7 +111,7 @@ func NewSqlStore(ctx context.Context, db *gorm.DB, storeEngine types.Engine, met
|
||||
|
||||
if skipMigration {
|
||||
log.WithContext(ctx).Infof("skipping migration")
|
||||
return &SqlStore{db: db, storeEngine: storeEngine, metrics: metrics, installationPK: 1}, nil
|
||||
return &SqlStore{db: db, storeEngine: storeEngine, metrics: metrics, installationPK: 1, transactionTimeout: transactionTimeout}, nil
|
||||
}
|
||||
|
||||
if err := migratePreAuto(ctx, db); err != nil {
|
||||
@@ -120,7 +130,7 @@ func NewSqlStore(ctx context.Context, db *gorm.DB, storeEngine types.Engine, met
|
||||
return nil, fmt.Errorf("migratePostAuto: %w", err)
|
||||
}
|
||||
|
||||
return &SqlStore{db: db, storeEngine: storeEngine, metrics: metrics, installationPK: 1}, nil
|
||||
return &SqlStore{db: db, storeEngine: storeEngine, metrics: metrics, installationPK: 1, transactionTimeout: transactionTimeout}, nil
|
||||
}
|
||||
|
||||
func GetKeyQueryCondition(s *SqlStore) string {
|
||||
@@ -2897,8 +2907,11 @@ func (s *SqlStore) IncrementNetworkSerial(ctx context.Context, accountId string)
|
||||
}
|
||||
|
||||
func (s *SqlStore) ExecuteInTransaction(ctx context.Context, operation func(store Store) error) error {
|
||||
timeoutCtx, cancel := context.WithTimeout(context.Background(), s.transactionTimeout)
|
||||
defer cancel()
|
||||
|
||||
startTime := time.Now()
|
||||
tx := s.db.Begin()
|
||||
tx := s.db.WithContext(timeoutCtx).Begin()
|
||||
if tx.Error != nil {
|
||||
return tx.Error
|
||||
}
|
||||
@@ -2933,6 +2946,9 @@ func (s *SqlStore) ExecuteInTransaction(ctx context.Context, operation func(stor
|
||||
err := operation(repo)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
if errors.Is(err, context.DeadlineExceeded) || errors.Is(timeoutCtx.Err(), context.DeadlineExceeded) {
|
||||
log.WithContext(ctx).Warnf("transaction exceeded %s timeout after %v, stack: %s", s.transactionTimeout, time.Since(startTime), debug.Stack())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -2945,13 +2961,19 @@ func (s *SqlStore) ExecuteInTransaction(ctx context.Context, operation func(stor
|
||||
}
|
||||
|
||||
err = tx.Commit().Error
|
||||
if err != nil {
|
||||
if errors.Is(err, context.DeadlineExceeded) || errors.Is(timeoutCtx.Err(), context.DeadlineExceeded) {
|
||||
log.WithContext(ctx).Warnf("transaction commit exceeded %s timeout after %v, stack: %s", s.transactionTimeout, time.Since(startTime), debug.Stack())
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Tracef("transaction took %v", time.Since(startTime))
|
||||
if s.metrics != nil {
|
||||
s.metrics.StoreMetrics().CountTransactionDuration(time.Since(startTime))
|
||||
}
|
||||
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SqlStore) withTx(tx *gorm.DB) Store {
|
||||
|
||||
@@ -3857,3 +3857,30 @@ func TestSqlStore_ApproveAccountPeers(t *testing.T) {
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestSqlStore_ExecuteInTransaction_Timeout(t *testing.T) {
|
||||
if os.Getenv("NETBIRD_STORE_ENGINE") == "mysql" {
|
||||
t.Skip("Skipping timeout test for MySQL")
|
||||
}
|
||||
|
||||
t.Setenv("NB_STORE_TRANSACTION_TIMEOUT", "1s")
|
||||
|
||||
store, cleanup, err := NewTestStoreFromSQL(context.Background(), "", t.TempDir())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(cleanup)
|
||||
|
||||
sqlStore, ok := store.(*SqlStore)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, 1*time.Second, sqlStore.transactionTimeout)
|
||||
|
||||
ctx := context.Background()
|
||||
err = sqlStore.ExecuteInTransaction(ctx, func(transaction Store) error {
|
||||
// Sleep for 2 seconds to exceed the 1 second timeout
|
||||
time.Sleep(2 * time.Second)
|
||||
return nil
|
||||
})
|
||||
|
||||
// The transaction should fail with an error (either timeout or already rolled back)
|
||||
require.Error(t, err)
|
||||
assert.Contains(t, err.Error(), "transaction has already been committed or rolled back", "expected transaction rolled back error, got: %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user