Compare commits

...

17 Commits

Author SHA1 Message Date
Pascal Fischer
5d34804062 Merge branch 'main' into feature/limit-update-channel 2025-06-28 18:21:17 +02:00
Pascal Fischer
418d23d7f8 Merge branch 'main' into feature/limit-update-channel 2025-06-28 18:20:41 +02:00
Pascal Fischer
b0582c86ba update config if exist 2025-06-28 17:59:35 +02:00
Pascal Fischer
1b7c787cc5 fix tests 2025-06-28 16:08:25 +02:00
Pascal Fischer
ad50e07325 fix testing tool and remove unused const 2025-06-28 15:45:07 +02:00
Pascal Fischer
c66bd0cc71 fix tests 2025-06-28 15:33:49 +02:00
Pascal Fischer
6f0111eab0 fix tests 2025-06-25 16:38:49 +02:00
Pascal Fischer
d403d20e7b fix testing tools 2025-06-25 15:37:31 +02:00
Pascal Fischer
c043018939 fix tests 2025-06-25 12:27:55 +02:00
Pascal Fischer
ed6ed4a597 add metrics 2025-06-25 12:09:06 +02:00
Pascal Fischer
905a3481ec Merge branch 'refs/heads/main' into feature/limit-update-channel 2025-06-25 12:05:33 +02:00
Pascal Fischer
47052fa024 Merge branch 'main' into feature/limit-update-channel 2025-06-25 11:32:30 +02:00
Pascal Fischer
4795e2fbc4 increment network serial on peer meta changed 2025-06-19 12:46:50 +02:00
Pascal Fischer
ec57c685a9 increment network serial on peer meta changed 2025-06-19 12:41:03 +02:00
Pascal Fischer
4b44b8c46c increment network serial 2025-06-19 12:14:26 +02:00
Pascal Fischer
0c7cac81f0 use update buffer instead of channel 2025-06-17 15:44:14 +02:00
Pascal Fischer
5e9ea122f7 limit channel to 2 messages and drop outdated if needed 2025-06-17 15:00:23 +02:00
24 changed files with 694 additions and 162 deletions

View File

@@ -83,7 +83,6 @@ func startManagement(t *testing.T, config *types.Config, testFile string) (*grpc
} }
t.Cleanup(cleanUp) t.Cleanup(cleanUp)
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
if err != nil { if err != nil {
return nil, nil return nil, nil
@@ -92,6 +91,9 @@ func startManagement(t *testing.T, config *types.Config, testFile string) (*grpc
metrics, err := telemetry.NewDefaultAppMetrics(context.Background()) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err) require.NoError(t, err)
peersUpdateManager := mgmt.NewPeersUpdateManager(metrics)
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)

View File

@@ -1456,16 +1456,16 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
} }
t.Cleanup(cleanUp) t.Cleanup(cleanUp)
peersUpdateManager := server.NewPeersUpdateManager(nil) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := server.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
if err != nil { if err != nil {
return nil, "", err return nil, "", err
} }
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore) ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -191,16 +191,16 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
} }
t.Cleanup(cleanUp) t.Cleanup(cleanUp)
peersUpdateManager := server.NewPeersUpdateManager(nil) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := server.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
if err != nil { if err != nil {
return nil, "", err return nil, "", err
} }
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore) ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -68,13 +68,13 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
} }
t.Cleanup(cleanUp) t.Cleanup(cleanUp)
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background()) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err) require.NoError(t, err)
peersUpdateManager := mgmt.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{}
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -1624,6 +1624,10 @@ func (am *DefaultAccountManager) GetDNSDomain(settings *types.Settings) string {
func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) { func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) {
log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID) log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID)
err := am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
}
am.BufferUpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }

View File

@@ -17,6 +17,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
nbdns "github.com/netbirdio/netbird/dns" nbdns "github.com/netbirdio/netbird/dns"
@@ -1186,7 +1187,10 @@ func TestAccountManager_NetworkUpdates_SaveGroup(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
message := <-updMsg message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap() networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 2 { if len(networkMap.RemotePeers) != 2 {
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers)) t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
@@ -1213,7 +1217,10 @@ func TestAccountManager_NetworkUpdates_DeletePolicy(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
message := <-updMsg message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap() networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 { if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers)) t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
@@ -1249,7 +1256,10 @@ func TestAccountManager_NetworkUpdates_SavePolicy(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
message := <-updMsg message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap() networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 2 { if len(networkMap.RemotePeers) != 2 {
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers)) t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
@@ -1314,7 +1324,10 @@ func TestAccountManager_NetworkUpdates_DeletePeer(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
message := <-updMsg message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap() networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 1 { if len(networkMap.RemotePeers) != 1 {
t.Errorf("mismatch peers count: 1 expected, got %v", len(networkMap.RemotePeers)) t.Errorf("mismatch peers count: 1 expected, got %v", len(networkMap.RemotePeers))
@@ -1365,15 +1378,24 @@ func TestAccountManager_NetworkUpdates_DeleteGroup(t *testing.T) {
return return
} }
// emptying buffer of previous changes
_, _ = updMsg.Pop(context.Background())
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
message := <-updMsg // expecting 2 messages (policy delete and group delete)
networkMap := message.Update.GetNetworkMap() for i := 0; i < 1; i++ {
if len(networkMap.RemotePeers) != 0 { message, ok := updMsg.Pop(context.Background())
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers)) if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
}
} }
}() }()
@@ -2879,7 +2901,7 @@ func createManager(t testing.TB) (*DefaultAccountManager, error) {
permissionsManager := permissions.NewManager(store) permissionsManager := permissions.NewManager(store)
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -2960,23 +2982,58 @@ func setupNetworkMapTest(t *testing.T) (*DefaultAccountManager, *types.Account,
return manager, account, peer1, peer2, peer3 return manager, account, peer1, peer2, peer3
} }
func peerShouldNotReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage) { func peerShouldNotReceiveUpdate(t *testing.T, buffer *UpdateBuffer) {
t.Helper() t.Helper()
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}()
select { select {
case msg := <-updateMessage: case msg := <-resultCh:
if !msg.ok {
t.Errorf("Update message channel closed unexpectedly")
}
t.Errorf("Unexpected message received: %+v", msg) t.Errorf("Unexpected message received: %+v", msg)
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
return return
} }
} }
func peerShouldReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage) { func peerShouldReceiveUpdate(t *testing.T, buffer *UpdateBuffer) {
t.Helper() t.Helper()
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}()
select { select {
case msg := <-updateMessage: case msg := <-resultCh:
if msg == nil { if !msg.ok {
t.Errorf("Update message channel closed unexpectedly")
return
}
if msg.msg == nil {
t.Errorf("Received nil update message, expected valid message") t.Errorf("Received nil update message, expected valid message")
return
} }
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
t.Error("Timed out waiting for update message") t.Error("Timed out waiting for update message")
@@ -3017,9 +3074,13 @@ func BenchmarkSyncAndMarkPeer(b *testing.B) {
if err != nil { if err != nil {
b.Fatalf("Failed to get account: %v", err) b.Fatalf("Failed to get account: %v", err)
} }
peerChannels := make(map[string]chan *UpdateMessage) peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers { for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) peerChannels[peerID] = NewUpdateBuffer(metrics)
} }
manager.peersUpdateManager.peerChannels = peerChannels manager.peersUpdateManager.peerChannels = peerChannels
@@ -3085,9 +3146,13 @@ func BenchmarkLoginPeer_ExistingPeer(b *testing.B) {
if err != nil { if err != nil {
b.Fatalf("Failed to get account: %v", err) b.Fatalf("Failed to get account: %v", err)
} }
peerChannels := make(map[string]chan *UpdateMessage) peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers { for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) peerChannels[peerID] = NewUpdateBuffer(metrics)
} }
manager.peersUpdateManager.peerChannels = peerChannels manager.peersUpdateManager.peerChannels = peerChannels
@@ -3160,9 +3225,13 @@ func BenchmarkLoginPeer_NewPeer(b *testing.B) {
if err != nil { if err != nil {
b.Fatalf("Failed to get account: %v", err) b.Fatalf("Failed to get account: %v", err)
} }
peerChannels := make(map[string]chan *UpdateMessage) peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers { for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) peerChannels[peerID] = NewUpdateBuffer(metrics)
} }
manager.peersUpdateManager.peerChannels = peerChannels manager.peersUpdateManager.peerChannels = peerChannels

View File

@@ -217,7 +217,7 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(store) permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
} }
func createDNSStore(t *testing.T) (store.Store, error) { func createDNSStore(t *testing.T) (store.Store, error) {
@@ -507,13 +507,12 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
}, true) }, true)
assert.NoError(t, err) assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Saving DNS settings with groups that have no peers should not trigger updates to account peers or send peer updates // Saving DNS settings with groups that have no peers should not trigger updates to account peers or send peer updates
t.Run("saving dns setting with unused groups", func(t *testing.T) { t.Run("saving dns setting with unused groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -534,6 +533,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Creating DNS settings with groups that have no peers should not update account peers or send peer update // Creating DNS settings with groups that have no peers should not update account peers or send peer update
t.Run("creating dns setting with unused groups", func(t *testing.T) { t.Run("creating dns setting with unused groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -560,6 +563,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Creating DNS settings with groups that have peers should update account peers and send peer update // Creating DNS settings with groups that have peers should update account peers and send peer update
t.Run("creating dns setting with used groups", func(t *testing.T) { t.Run("creating dns setting with used groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
err = manager.SaveGroup(context.Background(), account.Id, userID, &types.Group{ err = manager.SaveGroup(context.Background(), account.Id, userID, &types.Group{
ID: "groupA", ID: "groupA",
Name: "GroupA", Name: "GroupA",
@@ -567,6 +574,8 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
}, true) }, true)
assert.NoError(t, err) assert.NoError(t, err)
_, _ = updMsg.Pop(context.Background())
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -593,6 +602,18 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Saving DNS settings with groups that have peers should update account peers and send peer update // Saving DNS settings with groups that have peers should update account peers and send peer update
t.Run("saving dns setting with used groups", func(t *testing.T) { t.Run("saving dns setting with used groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
err = manager.SaveGroup(context.Background(), account.Id, userID, &types.Group{
ID: "groupA",
Name: "GroupA",
Peers: []string{peer1.ID, peer2.ID, peer3.ID},
}, true)
assert.NoError(t, err)
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -613,6 +634,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Removing group with no peers from DNS settings should not trigger updates to account peers or send peer updates // Removing group with no peers from DNS settings should not trigger updates to account peers or send peer updates
t.Run("removing group with no peers from dns settings", func(t *testing.T) { t.Run("removing group with no peers from dns settings", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -633,6 +658,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Removing group with peers from DNS settings should trigger updates to account peers and send peer updates // Removing group with peers from DNS settings should trigger updates to account peers and send peer updates
t.Run("removing group with peers from dns settings", func(t *testing.T) { t.Run("removing group with peers from dns settings", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)

View File

@@ -429,13 +429,12 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
}, true) }, true)
assert.NoError(t, err) assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Saving a group that is not linked to any resource should not update account peers // Saving a group that is not linked to any resource should not update account peers
t.Run("saving unlinked group", func(t *testing.T) { t.Run("saving unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -459,6 +458,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Adding a peer to a group that is not linked to any resource should not update account peers // Adding a peer to a group that is not linked to any resource should not update account peers
// and not send peer update // and not send peer update
t.Run("adding peer to unlinked group", func(t *testing.T) { t.Run("adding peer to unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -478,6 +481,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Removing a peer from a group that is not linked to any resource should not update account peers // Removing a peer from a group that is not linked to any resource should not update account peers
// and not send peer update // and not send peer update
t.Run("removing peer from unliked group", func(t *testing.T) { t.Run("removing peer from unliked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -496,6 +503,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Deleting group should not update account peers and not send peer update // Deleting group should not update account peers and not send peer update
t.Run("deleting group", func(t *testing.T) { t.Run("deleting group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -529,6 +540,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to policy should update account peers and send peer update // Saving a group linked to policy should update account peers and send peer update
t.Run("saving linked group to policy", func(t *testing.T) { t.Run("saving linked group to policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -551,6 +566,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// adding peer to a used group should update account peers and send peer update // adding peer to a used group should update account peers and send peer update
t.Run("adding peer to linked group", func(t *testing.T) { t.Run("adding peer to linked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -569,6 +588,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// removing peer from a linked group should update account peers and send peer update // removing peer from a linked group should update account peers and send peer update
t.Run("removing peer from linked group", func(t *testing.T) { t.Run("removing peer from linked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -587,6 +610,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to name server group should update account peers and send peer update // Saving a group linked to name server group should update account peers and send peer update
t.Run("saving group linked to name server group", func(t *testing.T) { t.Run("saving group linked to name server group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.CreateNameServerGroup( _, err = manager.CreateNameServerGroup(
context.Background(), account.Id, "nsGroup", "nsGroup", []nbdns.NameServer{{ context.Background(), account.Id, "nsGroup", "nsGroup", []nbdns.NameServer{{
IP: netip.MustParseAddr("1.1.1.1"), IP: netip.MustParseAddr("1.1.1.1"),
@@ -620,6 +647,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to route should update account peers and send peer update // Saving a group linked to route should update account peers and send peer update
t.Run("saving group linked to route", func(t *testing.T) { t.Run("saving group linked to route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
newRoute := route.Route{ newRoute := route.Route{
ID: "route", ID: "route",
Network: netip.MustParsePrefix("192.168.0.0/16"), Network: netip.MustParsePrefix("192.168.0.0/16"),
@@ -661,6 +692,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to dns settings should update account peers and send peer update // Saving a group linked to dns settings should update account peers and send peer update
t.Run("saving group linked to dns settings", func(t *testing.T) { t.Run("saving group linked to dns settings", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
err := manager.SaveDNSSettings(context.Background(), account.Id, userID, &types.DNSSettings{ err := manager.SaveDNSSettings(context.Background(), account.Id, userID, &types.DNSSettings{
DisabledManagementGroups: []string{"groupD"}, DisabledManagementGroups: []string{"groupD"},
}) })
@@ -688,6 +723,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to network router should update account peers and send peer update // Saving a group linked to network router should update account peers and send peer update
t.Run("saving group linked to network router", func(t *testing.T) { t.Run("saving group linked to network router", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
permissionsManager := permissions.NewManager(manager.Store) permissionsManager := permissions.NewManager(manager.Store)
groupsManager := groups.NewManager(manager.Store, permissionsManager, manager) groupsManager := groups.NewManager(manager.Store, permissionsManager, manager)
resourcesManager := resources.NewManager(manager.Store, permissionsManager, groupsManager, manager) resourcesManager := resources.NewManager(manager.Store, permissionsManager, groupsManager, manager)

View File

@@ -184,7 +184,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
return err return err
} }
updates := s.peersUpdateManager.CreateChannel(ctx, peer.ID) updateBuffer := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
s.ephemeralManager.OnPeerConnected(ctx, peer) s.ephemeralManager.OnPeerConnected(ctx, peer)
@@ -199,37 +199,24 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart)) log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv) return s.handleUpdates(ctx, accountID, peerKey, peer, updateBuffer, srv)
} }
// handleUpdates sends updates to the connected peer until the updates channel is closed. // handleUpdates sends updates to the connected peer until the updates channel is closed.
func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *UpdateMessage, srv proto.ManagementService_SyncServer) error { func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates *UpdateBuffer, srv proto.ManagementService_SyncServer) error {
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String()) log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())
for { for {
select { update, ok := updates.Pop(ctx)
// condition when there are some updates if !ok {
case update, open := <-updates: log.WithContext(ctx).Debugf("update buffer for peer %s closed", peerKey.String())
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().UpdateChannelQueueLength(len(updates) + 1)
}
if !open {
log.WithContext(ctx).Debugf("updates channel for peer %s was closed", peerKey.String())
s.cancelPeerRoutines(ctx, accountID, peer)
return nil
}
log.WithContext(ctx).Debugf("received an update for peer %s", peerKey.String())
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
return err
}
// condition when client <-> server connection has been terminated
case <-srv.Context().Done():
// happens when connection drops, e.g. client disconnects
log.WithContext(ctx).Debugf("stream of peer %s has been closed", peerKey.String())
s.cancelPeerRoutines(ctx, accountID, peer) s.cancelPeerRoutines(ctx, accountID, peer)
return srv.Context().Err() return nil
}
log.WithContext(ctx).Debugf("sending latest update to peer %s", peerKey.String())
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
return err
} }
} }
} }

View File

@@ -118,7 +118,7 @@ func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *serve
t.Fatalf("Failed to create metrics: %v", err) t.Fatalf("Failed to create metrics: %v", err)
} }
peersUpdateManager := server.NewPeersUpdateManager(nil) peersUpdateManager := server.NewPeersUpdateManager(metrics)
updMsg := peersUpdateManager.CreateChannel(context.Background(), TestPeerId) updMsg := peersUpdateManager.CreateChannel(context.Background(), TestPeerId)
done := make(chan struct{}) done := make(chan struct{})
if validateUpdate { if validateUpdate {
@@ -166,24 +166,54 @@ func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *serve
return apiHandler, am, done return apiHandler, am, done
} }
func peerShouldNotReceiveUpdate(t TB, updateMessage <-chan *server.UpdateMessage) { func peerShouldNotReceiveUpdate(t TB, buffer *server.UpdateBuffer) {
t.Helper() t.Helper()
resultCh := make(chan struct {
msg *server.UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *server.UpdateMessage
ok bool
}{msg, ok}
}()
select { select {
case msg := <-updateMessage: case msg := <-resultCh:
t.Errorf("Unexpected message received: %+v", msg) t.Errorf("Unexpected message received: %+v", msg)
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
return return
} }
} }
func peerShouldReceiveUpdate(t TB, updateMessage <-chan *server.UpdateMessage, expected *server.UpdateMessage) { func peerShouldReceiveUpdate(t TB, buffer *server.UpdateBuffer, expected *server.UpdateMessage) {
t.Helper() t.Helper()
resultCh := make(chan struct {
msg *server.UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *server.UpdateMessage
ok bool
}{msg, ok}
}()
select { select {
case msg := <-updateMessage: case msg := <-resultCh:
if msg == nil { if msg.msg == nil {
t.Errorf("Received nil update message, expected valid message") t.Errorf("Received nil update message, expected valid message")
} }
if !msg.ok {
t.Errorf("Expected to receive an update message, but got ok = false")
}
assert.Equal(t, expected, msg) assert.Equal(t, expected, msg)
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
t.Errorf("Timed out waiting for update message") t.Errorf("Timed out waiting for update message")

View File

@@ -424,14 +424,14 @@ func startManagementForTest(t *testing.T, testFile string, config *types.Config)
t.Fatal(err) t.Fatal(err)
} }
peersUpdateManager := NewPeersUpdateManager(nil) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
ctx := context.WithValue(context.Background(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck ctx := context.WithValue(context.Background(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -173,14 +173,14 @@ func startServer(
log.Fatalf("failed creating a store: %s: %v", config.Datadir, err) log.Fatalf("failed creating a store: %s: %v", config.Datadir, err)
} }
peersUpdateManager := server.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
metrics, err := telemetry.NewDefaultAppMetrics(context.Background()) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil { if err != nil {
t.Fatalf("failed creating metrics: %v", err) t.Fatalf("failed creating metrics: %v", err)
} }
peersUpdateManager := server.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{}
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -779,7 +779,7 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(store) permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
} }
func createNSStore(t *testing.T) (store.Store, error) { func createNSStore(t *testing.T) (store.Store, error) {
@@ -988,14 +988,14 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
}, true) }, true)
assert.NoError(t, err) assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Creating a nameserver group with a distribution group no peers should not update account peers // Creating a nameserver group with a distribution group no peers should not update account peers
// and not send peer update // and not send peer update
t.Run("creating nameserver group with distribution group no peers", func(t *testing.T) { t.Run("creating nameserver group with distribution group no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -1023,6 +1023,11 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// saving a nameserver group with a distribution group with no peers should not update account peers // saving a nameserver group with a distribution group with no peers should not update account peers
// and not send peer update // and not send peer update
t.Run("saving nameserver group with distribution group no peers", func(t *testing.T) { t.Run("saving nameserver group with distribution group no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -1041,6 +1046,11 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// Creating a nameserver group with a distribution group no peers should update account peers and send peer update // Creating a nameserver group with a distribution group no peers should update account peers and send peer update
t.Run("creating nameserver group with distribution group has peers", func(t *testing.T) { t.Run("creating nameserver group with distribution group has peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1067,6 +1077,10 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// saving a nameserver group with a distribution group with peers should update account peers and send peer update // saving a nameserver group with a distribution group with peers should update account peers and send peer update
t.Run("saving nameserver group with distribution group has peers", func(t *testing.T) { t.Run("saving nameserver group with distribution group has peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1097,6 +1111,10 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// Deleting a nameserver group should update account peers and send peer update // Deleting a nameserver group should update account peers and send peer update
t.Run("deleting nameserver group", func(t *testing.T) { t.Run("deleting nameserver group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)

View File

@@ -144,6 +144,10 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
if expired { if expired {
// we need to update other peers because when peer login expires all other peers are notified to disconnect from // we need to update other peers because when peer login expires all other peers are notified to disconnect from
// the expired one. Here we notify them that connection is now allowed again. // the expired one. Here we notify them that connection is now allowed again.
err := am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
}
am.BufferUpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
@@ -270,6 +274,11 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
inactivityExpirationChanged = true inactivityExpirationChanged = true
} }
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
return err
}
return transaction.SavePeer(ctx, store.LockingStrengthUpdate, accountID, peer) return transaction.SavePeer(ctx, store.LockingStrengthUpdate, accountID, peer)
}) })
if err != nil { if err != nil {
@@ -755,6 +764,13 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
return err return err
} }
} }
if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) {
if err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID); err != nil {
return err
}
}
return nil return nil
}) })
if err != nil { if err != nil {
@@ -888,6 +904,13 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
} }
} }
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
return fmt.Errorf("failed to increment network serial: %w", err)
}
}
return nil return nil
}) })
if err != nil { if err != nil {

View File

@@ -19,6 +19,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding" "github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
@@ -963,10 +964,14 @@ func BenchmarkUpdateAccountPeers(b *testing.B) {
b.Fatalf("Failed to get account: %v", err) b.Fatalf("Failed to get account: %v", err)
} }
peerChannels := make(map[string]chan *UpdateMessage) peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers { for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) peerChannels[peerID] = NewUpdateBuffer(metrics)
} }
manager.peersUpdateManager.peerChannels = peerChannels manager.peersUpdateManager.peerChannels = peerChannels
@@ -1028,17 +1033,24 @@ func TestUpdateAccountPeers(t *testing.T) {
t.Fatalf("Failed to get account: %v", err) t.Fatalf("Failed to get account: %v", err)
} }
peerChannels := make(map[string]chan *UpdateMessage) peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
t.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers { for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) peerChannels[peerID] = NewUpdateBuffer(metrics)
} }
manager.peersUpdateManager.peerChannels = peerChannels manager.peersUpdateManager.peerChannels = peerChannels
manager.UpdateAccountPeers(ctx, account.Id) manager.UpdateAccountPeers(ctx, account.Id)
for _, channel := range peerChannels { for _, channel := range peerChannels {
update := <-channel update, ok := channel.Pop(context.Background())
if !ok {
t.Fatalf("Expected update for peer, but channel is empty")
}
assert.Nil(t, update.Update.NetbirdConfig) assert.Nil(t, update.Update.NetbirdConfig)
assert.Equal(t, tc.peers, len(update.NetworkMap.Peers)) assert.Equal(t, tc.peers, len(update.NetworkMap.Peers))
assert.Equal(t, tc.peers*2, len(update.NetworkMap.FirewallRules)) assert.Equal(t, tc.peers*2, len(update.NetworkMap.FirewallRules))
@@ -1267,7 +1279,7 @@ func Test_RegisterPeerByUser(t *testing.T) {
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s) permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err) assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1342,7 +1354,7 @@ func Test_RegisterPeerBySetupKey(t *testing.T) {
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s) permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err) assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1477,7 +1489,7 @@ func Test_RegisterPeerRollbackOnFailure(t *testing.T) {
permissionsManager := permissions.NewManager(s) permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err) assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1546,7 +1558,7 @@ func Test_LoginPeer(t *testing.T) {
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s) permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err) assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1734,13 +1746,12 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
var peer5 *nbpeer.Peer var peer5 *nbpeer.Peer
var peer6 *nbpeer.Peer var peer6 *nbpeer.Peer
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Updating not expired peer and peer expiration is enabled should not update account peers and not send peer update // Updating not expired peer and peer expiration is enabled should not update account peers and not send peer update
t.Run("updating not expired peer and peer expiration is enabled", func(t *testing.T) { t.Run("updating not expired peer and peer expiration is enabled", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -1759,6 +1770,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to unlinked group should not update account peers and not send peer update // Adding peer to unlinked group should not update account peers and not send peer update
t.Run("adding peer to unlinked group", func(t *testing.T) { t.Run("adding peer to unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -1784,6 +1799,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with unlinked group should not update account peers and not send peer update // Deleting peer with unlinked group should not update account peers and not send peer update
t.Run("deleting peer with unlinked group", func(t *testing.T) { t.Run("deleting peer with unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -1802,6 +1821,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Updating peer label should update account peers and send peer update // Updating peer label should update account peers and send peer update
t.Run("updating peer label", func(t *testing.T) { t.Run("updating peer label", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1820,6 +1843,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
}) })
t.Run("validator requires update", func(t *testing.T) { t.Run("validator requires update", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
requireUpdateFunc := func(_ context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error) { requireUpdateFunc := func(_ context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error) {
return update, true, nil return update, true, nil
} }
@@ -1842,6 +1869,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
}) })
t.Run("validator requires no update", func(t *testing.T) { t.Run("validator requires no update", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
requireNoUpdateFunc := func(_ context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error) { requireNoUpdateFunc := func(_ context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error) {
return update, false, nil return update, false, nil
} }
@@ -1865,6 +1896,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to group linked with policy should update account peers and send peer update // Adding peer to group linked with policy should update account peers and send peer update
t.Run("adding peer to group linked with policy", func(t *testing.T) { t.Run("adding peer to group linked with policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{ _, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
AccountID: account.Id, AccountID: account.Id,
Enabled: true, Enabled: true,
@@ -1906,6 +1941,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with linked group to policy should update account peers and send peer update // Deleting peer with linked group to policy should update account peers and send peer update
t.Run("deleting peer with linked group to policy", func(t *testing.T) { t.Run("deleting peer with linked group to policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1924,6 +1963,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to group linked with route should update account peers and send peer update // Adding peer to group linked with route should update account peers and send peer update
t.Run("adding peer to group linked with route", func(t *testing.T) { t.Run("adding peer to group linked with route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
route := nbroute.Route{ route := nbroute.Route{
ID: "testingRoute1", ID: "testingRoute1",
Network: netip.MustParsePrefix("100.65.250.202/32"), Network: netip.MustParsePrefix("100.65.250.202/32"),
@@ -1970,6 +2013,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with linked group to route should update account peers and send peer update // Deleting peer with linked group to route should update account peers and send peer update
t.Run("deleting peer with linked group to route", func(t *testing.T) { t.Run("deleting peer with linked group to route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1988,6 +2035,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to group linked with name server group should update account peers and send peer update // Adding peer to group linked with name server group should update account peers and send peer update
t.Run("adding peer to group linked with name server group", func(t *testing.T) { t.Run("adding peer to group linked with name server group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.CreateNameServerGroup( _, err = manager.CreateNameServerGroup(
context.Background(), account.Id, "nsGroup", "nsGroup", []nbdns.NameServer{{ context.Background(), account.Id, "nsGroup", "nsGroup", []nbdns.NameServer{{
IP: netip.MustParseAddr("1.1.1.1"), IP: netip.MustParseAddr("1.1.1.1"),
@@ -2025,6 +2076,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with linked group to name server group should update account peers and send peer update // Deleting peer with linked group to name server group should update account peers and send peer update
t.Run("deleting peer with linked group to route", func(t *testing.T) { t.Run("deleting peer with linked group to route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)

View File

@@ -1017,17 +1017,16 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
}, true) }, true)
assert.NoError(t, err) assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
var policyWithGroupRulesNoPeers *types.Policy var policyWithGroupRulesNoPeers *types.Policy
var policyWithDestinationPeersOnly *types.Policy var policyWithDestinationPeersOnly *types.Policy
var policyWithSourceAndDestinationPeers *types.Policy var policyWithSourceAndDestinationPeers *types.Policy
// Saving policy with rule groups with no peers should not update account's peers and not send peer update // Saving policy with rule groups with no peers should not update account's peers and not send peer update
t.Run("saving policy with rule groups with no peers", func(t *testing.T) { t.Run("saving policy with rule groups with no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -1059,6 +1058,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Saving policy with source group containing peers, but destination group without peers should // Saving policy with source group containing peers, but destination group without peers should
// update account's peers and send peer update // update account's peers and send peer update
t.Run("saving policy where source has peers but destination does not", func(t *testing.T) { t.Run("saving policy where source has peers but destination does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1091,6 +1094,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Saving policy with destination group containing peers, but source group without peers should // Saving policy with destination group containing peers, but source group without peers should
// update account's peers and send peer update // update account's peers and send peer update
t.Run("saving policy where destination has peers but source does not", func(t *testing.T) { t.Run("saving policy where destination has peers but source does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1123,6 +1130,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Saving policy with destination and source groups containing peers should update account's peers // Saving policy with destination and source groups containing peers should update account's peers
// and send peer update // and send peer update
t.Run("saving policy with source and destination groups with peers", func(t *testing.T) { t.Run("saving policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1154,6 +1165,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Disabling policy with destination and source groups containing peers should update account's peers // Disabling policy with destination and source groups containing peers should update account's peers
// and send peer update // and send peer update
t.Run("disabling policy with source and destination groups with peers", func(t *testing.T) { t.Run("disabling policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1174,6 +1189,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Updating disabled policy with destination and source groups containing peers should not update account's peers // Updating disabled policy with destination and source groups containing peers should not update account's peers
// or send peer update // or send peer update
t.Run("updating disabled policy with source and destination groups with peers", func(t *testing.T) { t.Run("updating disabled policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -1195,6 +1214,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Enabling policy with destination and source groups containing peers should update account's peers // Enabling policy with destination and source groups containing peers should update account's peers
// and send peer update // and send peer update
t.Run("enabling policy with source and destination groups with peers", func(t *testing.T) { t.Run("enabling policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1214,6 +1237,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Deleting policy should trigger account peers update and send peer update // Deleting policy should trigger account peers update and send peer update
t.Run("deleting policy with source and destination groups with peers", func(t *testing.T) { t.Run("deleting policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1234,6 +1261,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Deleting policy with destination group containing peers, but source group without peers should // Deleting policy with destination group containing peers, but source group without peers should
// update account's peers and send peer update // update account's peers and send peer update
t.Run("deleting policy where destination has peers but source does not", func(t *testing.T) { t.Run("deleting policy where destination has peers but source does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1252,6 +1283,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Deleting policy with no peers in groups should not update account's peers and not send peer update // Deleting policy with no peers in groups should not update account's peers and not send peer update
t.Run("deleting policy with no peers in groups", func(t *testing.T) { t.Run("deleting policy with no peers in groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)

View File

@@ -140,11 +140,6 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
}, true) }, true)
assert.NoError(t, err) assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
postureCheckA := &posture.Checks{ postureCheckA := &posture.Checks{
Name: "postureCheckA", Name: "postureCheckA",
AccountID: account.Id, AccountID: account.Id,
@@ -171,6 +166,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Saving unused posture check should not update account peers and not send peer update // Saving unused posture check should not update account peers and not send peer update
t.Run("saving unused posture check", func(t *testing.T) { t.Run("saving unused posture check", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -189,6 +188,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating unused posture check should not update account peers and not send peer update // Updating unused posture check should not update account peers and not send peer update
t.Run("updating unused posture check", func(t *testing.T) { t.Run("updating unused posture check", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -226,6 +229,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Linking posture check to policy should trigger update account peers and send peer update // Linking posture check to policy should trigger update account peers and send peer update
t.Run("linking posture check to policy with peers", func(t *testing.T) { t.Run("linking posture check to policy with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -244,6 +251,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked posture checks should update account peers and send peer update // Updating linked posture checks should update account peers and send peer update
t.Run("updating linked to posture check with peers", func(t *testing.T) { t.Run("updating linked to posture check with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
postureCheckB.Checks = posture.ChecksDefinition{ postureCheckB.Checks = posture.ChecksDefinition{
NBVersionCheck: &posture.NBVersionCheck{ NBVersionCheck: &posture.NBVersionCheck{
MinVersion: "0.29.0", MinVersion: "0.29.0",
@@ -273,6 +284,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Removing posture check from policy should trigger account peers update and send peer update // Removing posture check from policy should trigger account peers update and send peer update
t.Run("removing posture check from policy", func(t *testing.T) { t.Run("removing posture check from policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -292,6 +307,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Deleting unused posture check should not trigger account peers update and not send peer update // Deleting unused posture check should not trigger account peers update and not send peer update
t.Run("deleting unused posture check", func(t *testing.T) { t.Run("deleting unused posture check", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -313,6 +332,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked posture check to policy with no peers should not trigger account peers update and not send peer update // Updating linked posture check to policy with no peers should not trigger account peers update and not send peer update
t.Run("updating linked posture check to policy with no peers", func(t *testing.T) { t.Run("updating linked posture check to policy with no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{ _, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
Enabled: true, Enabled: true,
Rules: []*types.PolicyRule{ Rules: []*types.PolicyRule{
@@ -352,7 +375,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked posture check to policy where destination has peers but source does not // Updating linked posture check to policy where destination has peers but source does not
// should trigger account peers update and send peer update // should trigger account peers update and send peer update
t.Run("updating linked posture check to policy where destination has peers but source does not", func(t *testing.T) { t.Run("updating linked posture check to policy where destination has peers but source does not", func(t *testing.T) {
updMsg1 := manager.peersUpdateManager.CreateChannel(context.Background(), peer2.ID) updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer2.ID)
t.Cleanup(func() { t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer2.ID) manager.peersUpdateManager.CloseChannel(context.Background(), peer2.ID)
}) })
@@ -374,7 +397,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg1) peerShouldReceiveUpdate(t, updMsg)
close(done) close(done)
}() }()
@@ -396,6 +419,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked client posture check to policy where source has peers but destination does not, // Updating linked client posture check to policy where source has peers but destination does not,
// should trigger account peers update and send peer update // should trigger account peers update and send peer update
t.Run("updating linked posture check to policy where source has peers but destination does not", func(t *testing.T) { t.Run("updating linked posture check to policy where source has peers but destination does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{ _, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
Enabled: true, Enabled: true,
Rules: []*types.PolicyRule{ Rules: []*types.PolicyRule{

View File

@@ -1284,7 +1284,7 @@ func createRouterManager(t *testing.T) (*DefaultAccountManager, error) {
permissionsManager := permissions.NewManager(store) permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
} }
func createRouterStore(t *testing.T) (store.Store, error) { func createRouterStore(t *testing.T) (store.Store, error) {
@@ -1972,13 +1972,12 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
}, true) }, true)
assert.NoError(t, err) assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
// Creating a route with no routing peer and no peers in PeerGroups or Groups should not update account peers and not send peer update // Creating a route with no routing peer and no peers in PeerGroups or Groups should not update account peers and not send peer update
t.Run("creating route no routing peer and no peers in groups", func(t *testing.T) { t.Run("creating route no routing peer and no peers in groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
route := route.Route{ route := route.Route{
ID: "testingRoute1", ID: "testingRoute1",
Network: netip.MustParsePrefix("100.65.250.202/32"), Network: netip.MustParsePrefix("100.65.250.202/32"),
@@ -2015,6 +2014,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Creating a route with no routing peer and having peers in groups should update account peers and send peer update // Creating a route with no routing peer and having peers in groups should update account peers and send peer update
t.Run("creating a route with peers in PeerGroups and Groups", func(t *testing.T) { t.Run("creating a route with peers in PeerGroups and Groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
route := route.Route{ route := route.Route{
ID: "testingRoute2", ID: "testingRoute2",
Network: netip.MustParsePrefix("192.0.2.0/32"), Network: netip.MustParsePrefix("192.0.2.0/32"),
@@ -2064,6 +2067,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Creating route should update account peers and send peer update // Creating route should update account peers and send peer update
t.Run("creating route with a routing peer", func(t *testing.T) { t.Run("creating route with a routing peer", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -2089,6 +2096,11 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
t.Run("updating route", func(t *testing.T) { t.Run("updating route", func(t *testing.T) {
baseRoute.Groups = []string{routeGroup1, routeGroup2} baseRoute.Groups = []string{routeGroup1, routeGroup2}
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -2107,6 +2119,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Deleting the route should update account peers and send peer update // Deleting the route should update account peers and send peer update
t.Run("deleting route", func(t *testing.T) { t.Run("deleting route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -2125,6 +2141,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Adding peer to route peer groups that do not have any peers should update account peers and send peer update // Adding peer to route peer groups that do not have any peers should update account peers and send peer update
t.Run("adding peer to route peer groups that do not have any peers", func(t *testing.T) { t.Run("adding peer to route peer groups that do not have any peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
newRoute := route.Route{ newRoute := route.Route{
Network: netip.MustParsePrefix("192.168.12.0/16"), Network: netip.MustParsePrefix("192.168.12.0/16"),
NetID: "superNet", NetID: "superNet",
@@ -2165,6 +2185,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Adding peer to route groups that do not have any peers should update account peers and send peer update // Adding peer to route groups that do not have any peers should update account peers and send peer update
t.Run("adding peer to route groups that do not have any peers", func(t *testing.T) { t.Run("adding peer to route groups that do not have any peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
newRoute := route.Route{ newRoute := route.Route{
Network: netip.MustParsePrefix("192.168.13.0/16"), Network: netip.MustParsePrefix("192.168.13.0/16"),
NetID: "superNet", NetID: "superNet",

View File

@@ -22,6 +22,9 @@ type UpdateChannelMetrics struct {
calcPeerNetworkMapDurationMs metric.Int64Histogram calcPeerNetworkMapDurationMs metric.Int64Histogram
mergeNetworkMapDurationMicro metric.Int64Histogram mergeNetworkMapDurationMicro metric.Int64Histogram
toSyncResponseDurationMicro metric.Int64Histogram toSyncResponseDurationMicro metric.Int64Histogram
bufferPushCounter metric.Int64Counter
bufferOverwriteCounter metric.Int64Counter
bufferIgnoreCounter metric.Int64Counter
ctx context.Context ctx context.Context
} }
@@ -125,6 +128,27 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
return nil, err return nil, err
} }
bufferPushCounter, err := meter.Int64Counter("management.updatechannel.buffer.push.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates pushed to an empty buffer"))
if err != nil {
return nil, err
}
bufferOverwriteCounter, err := meter.Int64Counter("management.updatechannel.buffer.overwrite.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates overwriting old unsent updates in the buffer"))
if err != nil {
return nil, err
}
bufferIgnoreCounter, err := meter.Int64Counter("management.updatechannel.buffer.ignore.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates being ignored due to old network serial"))
if err != nil {
return nil, err
}
return &UpdateChannelMetrics{ return &UpdateChannelMetrics{
createChannelDurationMicro: createChannelDurationMicro, createChannelDurationMicro: createChannelDurationMicro,
closeChannelDurationMicro: closeChannelDurationMicro, closeChannelDurationMicro: closeChannelDurationMicro,
@@ -138,6 +162,9 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
calcPeerNetworkMapDurationMs: calcPeerNetworkMapDurationMs, calcPeerNetworkMapDurationMs: calcPeerNetworkMapDurationMs,
mergeNetworkMapDurationMicro: mergeNetworkMapDurationMicro, mergeNetworkMapDurationMicro: mergeNetworkMapDurationMicro,
toSyncResponseDurationMicro: toSyncResponseDurationMicro, toSyncResponseDurationMicro: toSyncResponseDurationMicro,
bufferPushCounter: bufferPushCounter,
bufferOverwriteCounter: bufferOverwriteCounter,
bufferIgnoreCounter: bufferIgnoreCounter,
ctx: ctx, ctx: ctx,
}, nil }, nil
} }
@@ -193,3 +220,18 @@ func (metrics *UpdateChannelMetrics) CountMergeNetworkMapDuration(duration time.
func (metrics *UpdateChannelMetrics) CountToSyncResponseDuration(duration time.Duration) { func (metrics *UpdateChannelMetrics) CountToSyncResponseDuration(duration time.Duration) {
metrics.toSyncResponseDurationMicro.Record(metrics.ctx, duration.Microseconds()) metrics.toSyncResponseDurationMicro.Record(metrics.ctx, duration.Microseconds())
} }
// CountBufferPush counts how many buffer push operations are happening on an empty buffer
func (metrics *UpdateChannelMetrics) CountBufferPush() {
metrics.bufferPushCounter.Add(metrics.ctx, 1)
}
// CountBufferOverwrite counts how many buffer overwrite operations are happening on a non-empty buffer
func (metrics *UpdateChannelMetrics) CountBufferOverwrite() {
metrics.bufferOverwriteCounter.Add(metrics.ctx, 1)
}
// CountBufferIgnore counts how many buffer ignore operations are happening when a new update is pushed
func (metrics *UpdateChannelMetrics) CountBufferIgnore() {
metrics.bufferIgnoreCounter.Add(metrics.ctx, 1)
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/netbirdio/netbird/management/proto" "github.com/netbirdio/netbird/management/proto"
"github.com/netbirdio/netbird/management/server/settings" "github.com/netbirdio/netbird/management/server/settings"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/util" "github.com/netbirdio/netbird/util"
) )
@@ -29,7 +30,11 @@ var TurnTestHost = &types.Host{
func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) { func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
ttl := util.Duration{Duration: time.Hour} ttl := util.Duration{Duration: time.Hour}
secret := "some_secret" secret := "some_secret"
peersManager := NewPeersUpdateManager(nil) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersManager := NewPeersUpdateManager(metrics)
rc := &types.Relay{ rc := &types.Relay{
Addresses: []string{"localhost:0"}, Addresses: []string{"localhost:0"},
@@ -77,9 +82,25 @@ func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) { func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) {
ttl := util.Duration{Duration: 2 * time.Second} ttl := util.Duration{Duration: 2 * time.Second}
secret := "some_secret" secret := "some_secret"
peersManager := NewPeersUpdateManager(nil) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersManager := NewPeersUpdateManager(metrics)
peer := "some_peer" peer := "some_peer"
updateChannel := peersManager.CreateChannel(context.Background(), peer) buffer := peersManager.CreateChannel(context.Background(), peer)
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}()
rc := &types.Relay{ rc := &types.Relay{
Addresses: []string{"localhost:0"}, Addresses: []string{"localhost:0"},
@@ -117,8 +138,8 @@ func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) {
loop: loop:
for timeout := time.After(5 * time.Second); ; { for timeout := time.After(5 * time.Second); ; {
select { select {
case update := <-updateChannel: case update := <-resultCh:
updates = append(updates, update) updates = append(updates, update.msg)
case <-timeout: case <-timeout:
break loop break loop
} }
@@ -181,7 +202,11 @@ loop:
func TestTimeBasedAuthSecretsManager_CancelRefresh(t *testing.T) { func TestTimeBasedAuthSecretsManager_CancelRefresh(t *testing.T) {
ttl := util.Duration{Duration: time.Hour} ttl := util.Duration{Duration: time.Hour}
secret := "some_secret" secret := "some_secret"
peersManager := NewPeersUpdateManager(nil) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersManager := NewPeersUpdateManager(metrics)
peer := "some_peer" peer := "some_peer"
rc := &types.Relay{ rc := &types.Relay{

View File

@@ -0,0 +1,93 @@
package server
import (
"context"
"sync"
"github.com/netbirdio/netbird/management/server/telemetry"
)
type UpdateBuffer struct {
mu sync.Mutex
cond *sync.Cond
update *UpdateMessage
closed bool
metrics *telemetry.UpdateChannelMetrics
}
func NewUpdateBuffer(metrics *telemetry.UpdateChannelMetrics) *UpdateBuffer {
ub := &UpdateBuffer{metrics: metrics}
ub.cond = sync.NewCond(&ub.mu)
return ub
}
func (b *UpdateBuffer) Push(update *UpdateMessage) {
b.mu.Lock()
defer b.mu.Unlock()
if b.update != nil && update.Update.NetbirdConfig != nil {
if update.Update.NetbirdConfig.Relay != nil {
b.update.Update.NetbirdConfig.Relay = update.Update.NetbirdConfig.Relay
}
if update.Update.NetbirdConfig.Signal != nil {
b.update.Update.NetbirdConfig.Signal = update.Update.NetbirdConfig.Signal
}
if update.Update.NetbirdConfig.Flow != nil {
b.update.Update.NetbirdConfig.Flow = update.Update.NetbirdConfig.Flow
}
if update.Update.NetbirdConfig.Stuns != nil {
b.update.Update.NetbirdConfig.Stuns = update.Update.NetbirdConfig.Stuns
}
if update.Update.NetbirdConfig.Turns != nil {
b.update.Update.NetbirdConfig.Turns = update.Update.NetbirdConfig.Turns
}
}
// the equal case we need because we don't always increment the serial number
if b.update == nil || update.Update.NetworkMap.Serial > b.update.Update.NetworkMap.Serial || b.update.Update.NetworkMap.Serial == 0 {
b.update = update
b.cond.Signal()
if b.update == nil {
b.metrics.CountBufferPush()
return
}
b.metrics.CountBufferOverwrite()
return
}
b.metrics.CountBufferIgnore()
}
func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, bool) {
b.mu.Lock()
defer b.mu.Unlock()
for b.update == nil && !b.closed {
waitCh := make(chan struct{})
go func() {
select {
case <-ctx.Done():
b.cond.Broadcast()
case <-waitCh:
// noop
}
}()
b.cond.Wait()
close(waitCh)
}
if b.closed {
return nil, false
}
msg := b.update
b.update = nil
return msg, true
}
func (b *UpdateBuffer) Close() {
b.mu.Lock()
b.closed = true
b.cond.Broadcast()
b.mu.Unlock()
}

View File

@@ -12,8 +12,6 @@ import (
"github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/management/server/types"
) )
const channelBufferSize = 100
type UpdateMessage struct { type UpdateMessage struct {
Update *proto.SyncResponse Update *proto.SyncResponse
NetworkMap *types.NetworkMap NetworkMap *types.NetworkMap
@@ -21,7 +19,7 @@ type UpdateMessage struct {
type PeersUpdateManager struct { type PeersUpdateManager struct {
// peerChannels is an update channel indexed by Peer.ID // peerChannels is an update channel indexed by Peer.ID
peerChannels map[string]chan *UpdateMessage peerChannels map[string]*UpdateBuffer
// channelsMux keeps the mutex to access peerChannels // channelsMux keeps the mutex to access peerChannels
channelsMux *sync.RWMutex channelsMux *sync.RWMutex
// metrics provides method to collect application metrics // metrics provides method to collect application metrics
@@ -31,7 +29,7 @@ type PeersUpdateManager struct {
// NewPeersUpdateManager returns a new instance of PeersUpdateManager // NewPeersUpdateManager returns a new instance of PeersUpdateManager
func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager { func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
return &PeersUpdateManager{ return &PeersUpdateManager{
peerChannels: make(map[string]chan *UpdateMessage), peerChannels: make(map[string]*UpdateBuffer),
channelsMux: &sync.RWMutex{}, channelsMux: &sync.RWMutex{},
metrics: metrics, metrics: metrics,
} }
@@ -53,20 +51,14 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
if channel, ok := p.peerChannels[peerID]; ok { if channel, ok := p.peerChannels[peerID]; ok {
found = true found = true
select { channel.Push(update)
case channel <- update:
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
default:
dropped = true
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel))
}
} else { } else {
log.WithContext(ctx).Debugf("peer %s has no channel", peerID) log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
} }
} }
// CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer. // CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer.
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage { func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) *UpdateBuffer {
start := time.Now() start := time.Now()
closed := false closed := false
@@ -81,22 +73,22 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c
if channel, ok := p.peerChannels[peerID]; ok { if channel, ok := p.peerChannels[peerID]; ok {
closed = true closed = true
channel.Close()
delete(p.peerChannels, peerID) delete(p.peerChannels, peerID)
close(channel)
} }
// mbragin: todo shouldn't it be more? or configurable? // mbragin: todo shouldn't it be more? or configurable?
channel := make(chan *UpdateMessage, channelBufferSize) buffer := NewUpdateBuffer(p.metrics.UpdateChannelMetrics())
p.peerChannels[peerID] = channel p.peerChannels[peerID] = buffer
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID) log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)
return channel return buffer
} }
func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) { func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
if channel, ok := p.peerChannels[peerID]; ok { if channel, ok := p.peerChannels[peerID]; ok {
delete(p.peerChannels, peerID) delete(p.peerChannels, peerID)
close(channel) channel.Close()
log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID) log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
return return

View File

@@ -6,13 +6,19 @@ import (
"time" "time"
"github.com/netbirdio/netbird/management/proto" "github.com/netbirdio/netbird/management/proto"
"github.com/netbirdio/netbird/management/server/telemetry"
) )
// var peersUpdater *PeersUpdateManager // var peersUpdater *PeersUpdateManager
func TestCreateChannel(t *testing.T) { func TestCreateChannel(t *testing.T) {
peer := "test-create" peer := "test-create"
peersUpdater := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersUpdater := NewPeersUpdateManager(metrics)
defer peersUpdater.CloseChannel(context.Background(), peer) defer peersUpdater.CloseChannel(context.Background(), peer)
_ = peersUpdater.CreateChannel(context.Background(), peer) _ = peersUpdater.CreateChannel(context.Background(), peer)
@@ -23,7 +29,12 @@ func TestCreateChannel(t *testing.T) {
func TestSendUpdate(t *testing.T) { func TestSendUpdate(t *testing.T) {
peer := "test-sendupdate" peer := "test-sendupdate"
peersUpdater := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersUpdater := NewPeersUpdateManager(metrics)
update1 := &UpdateMessage{Update: &proto.SyncResponse{ update1 := &UpdateMessage{Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{ NetworkMap: &proto.NetworkMap{
Serial: 0, Serial: 0,
@@ -33,41 +44,62 @@ func TestSendUpdate(t *testing.T) {
if _, ok := peersUpdater.peerChannels[peer]; !ok { if _, ok := peersUpdater.peerChannels[peer]; !ok {
t.Error("Error creating the channel") t.Error("Error creating the channel")
} }
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
for {
msg, ok := peersUpdater.peerChannels[peer].Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}
}()
peersUpdater.SendUpdate(context.Background(), peer, update1) peersUpdater.SendUpdate(context.Background(), peer, update1)
select { select {
case <-peersUpdater.peerChannels[peer]: case <-resultCh:
default: case <-time.After(1 * time.Second):
t.Error("Update wasn't send") t.Error("Update wasn't send")
} }
for range [channelBufferSize]int{} {
peersUpdater.SendUpdate(context.Background(), peer, update1)
}
update2 := &UpdateMessage{Update: &proto.SyncResponse{ update2 := &UpdateMessage{Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{ NetworkMap: &proto.NetworkMap{
Serial: 10, Serial: 10,
}, },
}} }}
update3 := &UpdateMessage{Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{
Serial: 8,
},
}}
peersUpdater.SendUpdate(context.Background(), peer, update2) peersUpdater.SendUpdate(context.Background(), peer, update2)
timeout := time.After(5 * time.Second) timeout := time.After(5 * time.Second)
for range [channelBufferSize]int{} {
select { select {
case <-timeout: case <-timeout:
t.Error("timed out reading previously sent updates") t.Error("timed out reading previously sent updates")
case updateReader := <-peersUpdater.peerChannels[peer]: case updateReader := <-resultCh:
if updateReader.Update.NetworkMap.Serial == update2.Update.NetworkMap.Serial { if updateReader.msg.Update.NetworkMap.Serial == update3.Update.NetworkMap.Serial {
t.Error("got the update that shouldn't have been sent") t.Error("got the update that shouldn't have been sent")
}
} }
} }
} }
func TestCloseChannel(t *testing.T) { func TestCloseChannel(t *testing.T) {
peer := "test-close" peer := "test-close"
peersUpdater := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersUpdater := NewPeersUpdateManager(metrics)
_ = peersUpdater.CreateChannel(context.Background(), peer) _ = peersUpdater.CreateChannel(context.Background(), peer)
if _, ok := peersUpdater.peerChannels[peer]; !ok { if _, ok := peersUpdater.peerChannels[peer]; !ok {
t.Error("Error creating the channel") t.Error("Error creating the channel")

View File

@@ -960,6 +960,12 @@ func (am *DefaultAccountManager) expireAndUpdatePeers(ctx context.Context, accou
) )
} }
// ideally this should run in a transaction
err = am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
}
if len(peerIDs) != 0 { if len(peerIDs) != 0 {
// this will trigger peer disconnect from the management service // this will trigger peer disconnect from the management service
am.peersUpdateManager.CloseChannels(ctx, peerIDs) am.peersUpdateManager.CloseChannels(ctx, peerIDs)