Merge branch 'feature/net-266-groups-migration' into feature/net-297-network-migration

# Conflicts:
#	management/server/migration/migration.go
#	management/server/peer_test.go
This commit is contained in:
Pascal Fischer
2025-07-04 13:09:01 +02:00
10 changed files with 119 additions and 87 deletions

View File

@@ -1314,7 +1314,7 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth
// Propagate changes to peers if group propagation is enabled
if settings.GroupsPropagationEnabled {
groups, err = transaction.GetAccountGroups(ctx, store.LockingStrengthUpdate, userAuth.AccountId)
groups, err = transaction.GetAccountGroups(ctx, store.LockingStrengthShare, userAuth.AccountId)
if err != nil {
return fmt.Errorf("error getting account groups: %w", err)
}
@@ -1899,7 +1899,7 @@ func (am *DefaultAccountManager) UpdateToPrimaryAccount(ctx context.Context, acc
// propagateUserGroupMemberships propagates all account users' group memberships to their peers.
// Returns true if any groups were modified, true if those updates affect peers and an error.
func propagateUserGroupMemberships(ctx context.Context, transaction store.Store, accountID string) (groupsUpdated bool, peersAffected bool, err error) {
groups, err := transaction.GetAccountGroups(ctx, store.LockingStrengthUpdate, accountID)
groups, err := transaction.GetAccountGroups(ctx, store.LockingStrengthShare, accountID)
if err != nil {
return false, false, err
}

View File

@@ -1240,9 +1240,10 @@ func TestAccountManager_NetworkUpdates_SavePolicy(t *testing.T) {
manager, account, peer1, peer2, _ := setupNetworkMapTest(t)
group := types.Group{
ID: "groupA",
Name: "GroupA",
Peers: []string{peer1.ID, peer2.ID},
AccountID: account.Id,
ID: "groupA",
Name: "GroupA",
Peers: []string{peer1.ID, peer2.ID},
}
if err := manager.SaveGroup(context.Background(), account.Id, userID, &group, true); err != nil {
t.Errorf("save group: %v", err)
@@ -1672,9 +1673,10 @@ func TestAccount_Copy(t *testing.T) {
},
Groups: map[string]*types.Group{
"group1": {
ID: "group1",
Peers: []string{"peer1"},
Resources: []types.Resource{},
ID: "group1",
Peers: []string{"peer1"},
Resources: []types.Resource{},
GroupPeers: []types.GroupPeer{},
},
},
Policies: []*types.Policy{
@@ -2616,6 +2618,7 @@ func TestAccount_GetNextInactivePeerExpiration(t *testing.T) {
}
func TestAccount_SetJWTGroups(t *testing.T) {
t.Setenv("NETBIRD_STORE_ENGINE", "postgres")
manager, err := createManager(t)
require.NoError(t, err, "unable to create account manager")

View File

@@ -742,7 +742,6 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
}
func Test_AddPeerToGroup(t *testing.T) {
t.Setenv("NETBIRD_STORE_ENGINE", string(types.PostgresStoreEngine))
manager, err := createManager(t)
if err != nil {
t.Fatal(err)
@@ -758,11 +757,10 @@ func Test_AddPeerToGroup(t *testing.T) {
return
}
const totalPeers = 10000 // totalPeers / differentHostnames should be less than 10 (due to concurrent retries)
const differentHostnames = 50
const totalPeers = 1000
var wg sync.WaitGroup
errs := make(chan error, totalPeers+differentHostnames)
errs := make(chan error, totalPeers)
start := make(chan struct{})
for i := 0; i < totalPeers; i++ {
wg.Add(1)
@@ -797,11 +795,10 @@ func Test_AddPeerToGroup(t *testing.T) {
t.Fatalf("Failed to get account %s: %v", accountID, err)
}
assert.Equal(t, totalPeers, len(maps.Values(account.Groups)[0].Peers), "Expected %d peers in account %s, got %d", totalPeers, accountID, len(account.Peers))
assert.Equal(t, totalPeers, len(maps.Values(account.Groups)[0].Peers), "Expected %d peers in group %s in account %s, got %d", totalPeers, maps.Values(account.Groups)[0].Name, accountID, len(account.Peers))
}
func Test_AddPeerToAll(t *testing.T) {
t.Setenv("NETBIRD_STORE_ENGINE", string(types.PostgresStoreEngine))
manager, err := createManager(t)
if err != nil {
t.Fatal(err)
@@ -817,11 +814,10 @@ func Test_AddPeerToAll(t *testing.T) {
return
}
const totalPeers = 10000 // totalPeers / differentHostnames should be less than 10 (due to concurrent retries)
const differentHostnames = 50
const totalPeers = 1000
var wg sync.WaitGroup
errs := make(chan error, totalPeers+differentHostnames)
errs := make(chan error, totalPeers)
start := make(chan struct{})
for i := 0; i < totalPeers; i++ {
wg.Add(1)
@@ -856,11 +852,10 @@ func Test_AddPeerToAll(t *testing.T) {
t.Fatalf("Failed to get account %s: %v", accountID, err)
}
assert.Equal(t, totalPeers, len(maps.Values(account.Groups)[0].Peers), "Expected %d peers in account %s, got %d", totalPeers, accountID, len(account.Peers))
assert.Equal(t, totalPeers, len(maps.Values(account.Groups)[0].Peers), "Expected %d peers in group %s account %s, got %d", totalPeers, maps.Values(account.Groups)[0].Name, accountID, len(account.Peers))
}
func Test_AddPeerAndAddToAll(t *testing.T) {
t.Setenv("NETBIRD_STORE_ENGINE", string(types.PostgresStoreEngine))
manager, err := createManager(t)
if err != nil {
t.Fatal(err)
@@ -876,11 +871,10 @@ func Test_AddPeerAndAddToAll(t *testing.T) {
return
}
const totalPeers = 10000 // totalPeers / differentHostnames should be less than 10 (due to concurrent retries)
const differentHostnames = 50
const totalPeers = 1000
var wg sync.WaitGroup
errs := make(chan error, totalPeers+differentHostnames)
errs := make(chan error, totalPeers)
start := make(chan struct{})
for i := 0; i < totalPeers; i++ {
wg.Add(1)
@@ -893,16 +887,16 @@ func Test_AddPeerAndAddToAll(t *testing.T) {
peer := &peer2.Peer{
ID: strconv.Itoa(i),
AccountID: accountID,
Meta: peer2.PeerSystemMeta{Hostname: "peer" + strconv.Itoa(i)},
DNSLabel: "peer" + strconv.Itoa(i),
IP: uint32ToIP(uint32(i)),
}
err = manager.Store.ExecuteInTransaction(context.Background(), func(transaction store.Store) error {
err = manager.Store.AddPeerToAccount(context.Background(), store.LockingStrengthNone, peer)
err = transaction.AddPeerToAccount(context.Background(), store.LockingStrengthUpdate, peer)
if err != nil {
return fmt.Errorf("AddPeer failed for peer %d: %w", i, err)
}
err = manager.Store.AddPeerToAllGroup(context.Background(), accountID, strconv.Itoa(i))
err = transaction.AddPeerToAllGroup(context.Background(), accountID, peer.ID)
if err != nil {
return fmt.Errorf("AddPeer failed for peer %d: %w", i, err)
}
@@ -931,7 +925,8 @@ func Test_AddPeerAndAddToAll(t *testing.T) {
t.Fatalf("Failed to get account %s: %v", accountID, err)
}
assert.Equal(t, totalPeers, len(maps.Values(account.Groups)[0].Peers), "Expected %d peers in account %s, got %d", totalPeers, accountID, len(account.Peers))
assert.Equal(t, totalPeers, len(maps.Values(account.Groups)[0].Peers), "Expected %d peers in group %s in account %s, got %d", totalPeers, maps.Values(account.Groups)[0].Name, accountID, len(account.Peers))
assert.Equal(t, totalPeers, len(account.Peers), "Expected %d peers in account %s, got %d", totalPeers, accountID, len(account.Peers))
}
func uint32ToIP(n uint32) net.IP {
@@ -941,7 +936,6 @@ func uint32ToIP(n uint32) net.IP {
}
func Test_IncrementNetworkSerial(t *testing.T) {
t.Setenv("NETBIRD_STORE_ENGINE", string(types.PostgresStoreEngine))
manager, err := createManager(t)
if err != nil {
t.Fatal(err)
@@ -957,7 +951,7 @@ func Test_IncrementNetworkSerial(t *testing.T) {
return
}
const totalPeers = 3000
const totalPeers = 1000
var wg sync.WaitGroup
errs := make(chan error, totalPeers)
@@ -973,7 +967,7 @@ func Test_IncrementNetworkSerial(t *testing.T) {
err = manager.Store.ExecuteInTransaction(context.Background(), func(transaction store.Store) error {
err = transaction.IncrementNetworkSerial(context.Background(), store.LockingStrengthNone, accountID)
if err != nil {
t.Fatalf("Failed to get account %s: %v", accountID, err)
return fmt.Errorf("failed to get account %s: %v", accountID, err)
}
return nil
})
@@ -1000,5 +994,5 @@ func Test_IncrementNetworkSerial(t *testing.T) {
t.Fatalf("Failed to get account %s: %v", accountID, err)
}
assert.Equal(t, totalPeers, int(account.Network.Serial), "Expected %d peers in account %s, got %d", totalPeers, accountID, account.Network.Serial)
assert.Equal(t, totalPeers, int(account.Network.Serial), "Expected %d serial increases in account %s, got %d", totalPeers, accountID, account.Network.Serial)
}

View File

@@ -475,16 +475,17 @@ func MigrateJsonToTable[T any](ctx context.Context, db *gorm.DB, columnName stri
}
}
if err := tx.Migrator().DropColumn(&model, columnName); err != nil {
return fmt.Errorf("drop column %s: %w", columnName, err)
}
// Todo: Enable this after we are sure that every thing works as expected and we do not need to rollback anymore
// if err := tx.Migrator().DropColumn(&model, columnName); err != nil {
// return fmt.Errorf("drop column %s: %w", columnName, err)
// }
return nil
}); err != nil {
return err
}
log.WithContext(ctx).Infof("Migration of JSON field %s from table %s into seperte table completed", columnName, tableName)
log.WithContext(ctx).Infof("Migration of JSON field %s from table %s into separate table completed", columnName, tableName)
return nil
}
@@ -530,14 +531,14 @@ func MigrateEmbeddedToTable[T any, S any, U any](ctx context.Context, db *gorm.D
return fmt.Errorf("failed to extract column names: %w", err)
}
for _, col := range cols {
if col == pkey {
continue
}
if err := tx.Migrator().DropColumn(&model, col); err != nil {
return fmt.Errorf("failed to drop column %s: %w", col, err)
}
}
// for _, col := range cols {
// if col == pkey {
// continue
// }
// if err := tx.Migrator().DropColumn(&model, col); err != nil {
// return fmt.Errorf("failed to drop column %s: %w", col, err)
// }
// }
return nil
}); err != nil {

View File

@@ -1459,6 +1459,10 @@ func Test_RegisterPeerBySetupKey(t *testing.T) {
}
func Test_RegisterPeerRollbackOnFailure(t *testing.T) {
engine := os.Getenv("NETBIRD_STORE_ENGINE")
if engine == "sqlite" || engine == "" {
t.Skip("Skipping test because sqlite test store is not respecting foreign keys")
}
if runtime.GOOS == "windows" {
t.Skip("The SQLite store is not properly supported by Windows yet")
}
@@ -1764,7 +1768,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
t.Run("adding peer to unlinked group", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg) //
close(done)
}()
@@ -2167,7 +2171,7 @@ func Test_AddPeer(t *testing.T) {
return
}
const totalPeers = 10000
const totalPeers = 300
var wg sync.WaitGroup
errs := make(chan error, totalPeers)

View File

@@ -186,6 +186,10 @@ func (s *SqlStore) SaveAccount(ctx context.Context, account *types.Account) erro
generateAccountSQLTypes(account)
for _, group := range account.GroupsG {
group.StoreGroupPeers()
}
err := s.db.Transaction(func(tx *gorm.DB) error {
result := tx.Select(clause.Associations).Delete(account.Policies, "account_id = ?", account.Id)
if result.Error != nil {
@@ -247,7 +251,7 @@ func generateAccountSQLTypes(account *types.Account) {
for id, group := range account.Groups {
group.ID = id
account.GroupsG = append(account.GroupsG, *group)
account.GroupsG = append(account.GroupsG, group)
}
for id, route := range account.Routes {
@@ -455,19 +459,12 @@ func (s *SqlStore) SaveGroups(ctx context.Context, lockStrength LockingStrength,
return nil
}
for _, g := range groups {
g.StoreGroupPeers()
}
return s.db.Transaction(func(tx *gorm.DB) error {
for _, g := range groups {
g.StoreGroupPeers()
if err := tx.Model(&g).
Association("GroupPeers").
Replace(g.GroupPeers); err != nil {
log.WithContext(ctx).Errorf("failed to save group peers to store: %s", err)
return status.Errorf(status.Internal, "failed to save group peers to store")
}
}
result := tx.Session(&gorm.Session{FullSaveAssociations: true}).
result := tx.
Clauses(
clause.Locking{Strength: string(lockStrength)},
clause.OnConflict{
@@ -475,12 +472,25 @@ func (s *SqlStore) SaveGroups(ctx context.Context, lockStrength LockingStrength,
UpdateAll: true,
},
).
Save(&groups)
Create(&groups)
if result.Error != nil {
log.WithContext(ctx).Errorf("failed to save groups to store: %v", result.Error)
return status.Errorf(status.Internal, "failed to save groups to store")
}
for _, g := range groups {
if len(g.GroupPeers) == 0 {
if err := tx.Where("group_id = ?", g.ID).Delete(&types.GroupPeer{}).Error; err != nil {
log.WithContext(ctx).Errorf("failed to delete group peers for group %s: %s", g.ID, err)
return status.Errorf(status.Internal, "failed to delete group peers")
}
} else {
if err := tx.Model(&g).Association("GroupPeers").Replace(g.GroupPeers); err != nil {
return status.Errorf(status.Internal, "failed to save group peers: %s", err)
}
}
}
return nil
})
}
@@ -1323,7 +1333,10 @@ func (s *SqlStore) AddPeerToAllGroup(ctx context.Context, accountID string, peer
return status.Errorf(status.NotFound, "group 'All' not found for account %s", accountID)
}
err := s.db.Create(&types.GroupPeer{
err := s.db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "group_id"}, {Name: "peer_id"}},
DoNothing: true,
}).Create(&types.GroupPeer{
GroupID: groupID,
PeerID: peerID,
}).Error
@@ -1758,9 +1771,6 @@ func (s *SqlStore) GetGroupByID(ctx context.Context, lockStrength LockingStrengt
// GetGroupByName retrieves a group by name and account ID.
func (s *SqlStore) GetGroupByName(ctx context.Context, lockStrength LockingStrength, accountID, groupName string) (*types.Group, error) {
tx := s.db
if lockStrength != LockingStrengthNone {
tx = tx.Clauses(clause.Locking{Strength: string(lockStrength)})
}
var group types.Group
@@ -1768,7 +1778,14 @@ func (s *SqlStore) GetGroupByName(ctx context.Context, lockStrength LockingStren
// we may need to reconsider changing the types.
query := tx.Preload(clause.Associations)
result := query.First(&group, "account_id = ? AND name = ?", accountID, groupName)
result := query.
Model(&types.Group{}).
Joins("LEFT JOIN group_peers ON group_peers.group_id = groups.id").
Where("groups.account_id = ? AND groups.name = ?", accountID, groupName).
Group("groups.id").
Order("COUNT(group_peers.peer_id) DESC").
Limit(1).
First(&group)
if err := result.Error; err != nil {
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
return nil, status.NewGroupNotFoundError(groupName)
@@ -1776,6 +1793,9 @@ func (s *SqlStore) GetGroupByName(ctx context.Context, lockStrength LockingStren
log.WithContext(ctx).Errorf("failed to get group by name from store: %v", result.Error)
return nil, status.Errorf(status.Internal, "failed to get group by name from store")
}
group.LoadGroupPeers()
return &group, nil
}
@@ -1808,23 +1828,25 @@ func (s *SqlStore) SaveGroup(ctx context.Context, lockStrength LockingStrength,
return status.Errorf(status.InvalidArgument, "group is nil")
}
group = group.Copy()
group.StoreGroupPeers()
tx := s.db
if lockStrength != LockingStrengthNone {
tx = tx.Clauses(clause.Locking{Strength: string(lockStrength)})
}
if err := tx.Model(group).Association("Peers").Replace(group.Peers); err != nil {
log.WithContext(ctx).Errorf("failed to replace peers for group %s: %v", group.ID, err)
return status.Errorf(status.Internal, "failed to sync group peers")
}
if err := tx.Save(group).Error; err != nil {
if err := s.db.Omit(clause.Associations).Save(group).Error; err != nil {
log.WithContext(ctx).Errorf("failed to save group to store: %v", err)
return status.Errorf(status.Internal, "failed to save group to store")
}
if len(group.GroupPeers) == 0 {
if err := s.db.Where("group_id = ?", group.ID).Delete(&types.GroupPeer{}).Error; err != nil {
log.WithContext(ctx).Errorf("failed to delete group peers for group %s: %s", group.ID, err)
return status.Errorf(status.Internal, "failed to delete group peers")
}
} else {
if err := s.db.Model(&group).Association("GroupPeers").Replace(group.GroupPeers); err != nil {
return status.Errorf(status.Internal, "failed to save group peers: %s", err)
}
}
return nil
}

View File

@@ -1340,10 +1340,12 @@ func TestSqlStore_SaveGroup(t *testing.T) {
accountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
group := &types.Group{
ID: "group-id",
AccountID: accountID,
Issued: "api",
Peers: []string{"peer1", "peer2"},
ID: "group-id",
AccountID: accountID,
Issued: "api",
Peers: []string{"peer1", "peer2"},
Resources: []types.Resource{},
GroupPeers: []types.GroupPeer{},
}
err = store.SaveGroup(context.Background(), LockingStrengthUpdate, group)
require.NoError(t, err)
@@ -1362,16 +1364,19 @@ func TestSqlStore_SaveGroups(t *testing.T) {
groups := []*types.Group{
{
ID: "group-1",
AccountID: accountID,
Issued: "api",
Peers: []string{"peer1", "peer2"},
ID: "group-1",
AccountID: accountID,
Issued: "api",
Peers: []string{"peer1", "peer2"},
Resources: []types.Resource{},
GroupPeers: []types.GroupPeer{},
},
{
ID: "group-2",
AccountID: accountID,
Issued: "integration",
Peers: []string{"peer3", "peer4"},
Resources: []types.Resource{},
},
}
err = store.SaveGroups(context.Background(), LockingStrengthUpdate, accountID, groups)

View File

@@ -33,8 +33,8 @@ INSERT INTO users VALUES('edafee4e-63fb-11ec-90d6-0242ac120003','bf1c8084-ba50-4
INSERT INTO users VALUES('f4f6d672-63fb-11ec-90d6-0242ac120003','bf1c8084-ba50-4ce7-9439-34653001fc3b','user',0,0,'','[]',0,NULL,'2024-10-02 16:01:38.210678+02:00','api',0,'');
INSERT INTO personal_access_tokens VALUES('9dj38s35-63fb-11ec-90d6-0242ac120003','f4f6d672-63fb-11ec-90d6-0242ac120003','','SoMeHaShEdToKeN','2023-02-27 00:00:00+00:00','user','2023-01-01 00:00:00+00:00','2023-02-01 00:00:00+00:00');
INSERT INTO "groups" VALUES('cfefqs706sqkneg59g4g','bf1c8084-ba50-4ce7-9439-34653001fc3b','All','api','[]',0,'');
INSERT INTO "groups" VALUES('cfefqs706sqkneg59g3g','bf1c8084-ba50-4ce7-9439-34653001fc3b','AwesomeGroup1','api','["peer1"]',0,'');
INSERT INTO "groups" VALUES('cfefqs706sqkneg59g2g','bf1c8084-ba50-4ce7-9439-34653001fc3b','AwesomeGroup2','api','["peer1","peer2","peer3"]',0,'');
INSERT INTO "groups" VALUES('cfefqs706sqkneg59g3g','bf1c8084-ba50-4ce7-9439-34653001fc3b','AwesomeGroup1','api','[]',0,'');
INSERT INTO "groups" VALUES('cfefqs706sqkneg59g2g','bf1c8084-ba50-4ce7-9439-34653001fc3b','AwesomeGroup2','api','[]',0,'');
INSERT INTO posture_checks VALUES('csplshq7qv948l48f7t0','NetBird Version > 0.32.0','','bf1c8084-ba50-4ce7-9439-34653001fc3b','{"NBVersionCheck":{"MinVersion":"0.31.0"}}');
INSERT INTO posture_checks VALUES('cspnllq7qv95uq1r4k90','Allow Berlin and Deny local network 172.16.1.0/24','','bf1c8084-ba50-4ce7-9439-34653001fc3b','{"GeoLocationCheck":{"Locations":[{"CountryCode":"DE","CityName":"Berlin"}],"Action":"allow"},"PeerNetworkRangeCheck":{"Action":"deny","Ranges":["172.16.1.0/24"]}}');
INSERT INTO name_server_groups VALUES('csqdelq7qv97ncu7d9t0','bf1c8084-ba50-4ce7-9439-34653001fc3b','Google DNS','Google DNS Servers','[{"IP":"8.8.8.8","NSType":1,"Port":53},{"IP":"8.8.4.4","NSType":1,"Port":53}]','["cfefqs706sqkneg59g2g"]',1,'[]',1,0);

View File

@@ -73,7 +73,7 @@ type Account struct {
Users map[string]*User `gorm:"-"`
UsersG []User `json:"-" gorm:"foreignKey:AccountID;references:id"`
Groups map[string]*Group `gorm:"-"`
GroupsG []Group `json:"-" gorm:"foreignKey:AccountID;references:id"`
GroupsG []*Group `json:"-" gorm:"foreignKey:AccountID;references:id"`
Policies []*Policy `gorm:"foreignKey:AccountID;references:id"`
Routes map[route.ID]*route.Route `gorm:"-"`
RoutesG []route.Route `json:"-" gorm:"foreignKey:AccountID;references:id"`

View File

@@ -45,7 +45,7 @@ func (g *Group) LoadGroupPeers() {
for i, peer := range g.GroupPeers {
g.Peers[i] = peer.PeerID
}
// g.GroupPeers = nil
g.GroupPeers = []GroupPeer{}
}
func (g *Group) StoreGroupPeers() {
g.GroupPeers = make([]GroupPeer, len(g.Peers))
@@ -55,7 +55,7 @@ func (g *Group) StoreGroupPeers() {
PeerID: peer,
}
}
// g.Peers = nil
g.Peers = []string{}
}
// EventMeta returns activity event meta related to the group
@@ -70,13 +70,16 @@ func (g *Group) EventMetaResource(resource *types.NetworkResource) map[string]an
func (g *Group) Copy() *Group {
group := &Group{
ID: g.ID,
AccountID: g.AccountID,
Name: g.Name,
Issued: g.Issued,
Peers: make([]string, len(g.Peers)),
GroupPeers: make([]GroupPeer, len(g.GroupPeers)),
Resources: make([]Resource, len(g.Resources)),
IntegrationReference: g.IntegrationReference,
}
copy(group.Peers, g.Peers)
copy(group.GroupPeers, g.GroupPeers)
copy(group.Resources, g.Resources)
return group
}