diff --git a/management/server/account_test.go b/management/server/account_test.go index b508e99c2..c40db3623 100644 --- a/management/server/account_test.go +++ b/management/server/account_test.go @@ -1190,7 +1190,7 @@ func testAccountManager_NetworkUpdates_SaveGroup(t *testing.T) { }, true) require.NoError(t, err) - updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID) + updChan := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID) defer manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID) wg := sync.WaitGroup{} @@ -1198,7 +1198,12 @@ func testAccountManager_NetworkUpdates_SaveGroup(t *testing.T) { go func() { defer wg.Done() - message := <-updMsg + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + message, _, _, err := updChan.NetworkMap.Pop(ctx) + if err != nil { + t.Errorf("timeout waiting for network update") + } networkMap := message.Update.GetNetworkMap() if len(networkMap.RemotePeers) != 2 { t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers)) @@ -1229,12 +1234,11 @@ func testAccountManager_NetworkUpdates_DeletePolicy(t *testing.T) { updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID) defer manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID) - // Ensure that we do not receive an update message before the policy is deleted - time.Sleep(time.Second) - select { - case <-updMsg: + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + _, _, _, err := updMsg.NetworkMap.Pop(ctx) + if err != nil { t.Logf("received addPeer update message before policy deletion") - default: } wg := sync.WaitGroup{} @@ -1242,7 +1246,12 @@ func testAccountManager_NetworkUpdates_DeletePolicy(t *testing.T) { go func() { defer wg.Done() - message := <-updMsg + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + message, _, _, err := updMsg.NetworkMap.Pop(ctx) + if err != nil { + t.Errorf("timeout waiting for network update") + } networkMap := message.Update.GetNetworkMap() if len(networkMap.RemotePeers) != 0 { t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers)) @@ -1288,7 +1297,12 @@ func testAccountManager_NetworkUpdates_SavePolicy(t *testing.T) { go func() { defer wg.Done() - message := <-updMsg + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + message, _, _, err := updMsg.NetworkMap.Pop(ctx) + if err != nil { + t.Errorf("timeout waiting for network update") + } networkMap := message.Update.GetNetworkMap() if len(networkMap.RemotePeers) != 2 { t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers)) @@ -1362,7 +1376,12 @@ func testAccountManager_NetworkUpdates_DeletePeer(t *testing.T) { go func() { defer wg.Done() - message := <-updMsg + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + message, _, _, err := updMsg.NetworkMap.Pop(ctx) + if err != nil { + t.Errorf("timeout waiting for network update") + } networkMap := message.Update.GetNetworkMap() if len(networkMap.RemotePeers) != 1 { t.Errorf("mismatch peers count: 1 expected, got %v", len(networkMap.RemotePeers)) @@ -1427,7 +1446,12 @@ func testAccountManager_NetworkUpdates_DeleteGroup(t *testing.T) { go func() { defer wg.Done() - message := <-updMsg + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + message, _, _, err := updMsg.NetworkMap.Pop(ctx) + if err != nil { + t.Errorf("timeout waiting for network update") + } networkMap := message.Update.GetNetworkMap() if len(networkMap.RemotePeers) != 0 { t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers)) @@ -2939,7 +2963,7 @@ func createManager(t testing.TB) (*DefaultAccountManager, error) { permissionsManager := permissions.NewManager(store) - manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) + manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) if err != nil { return nil, err } @@ -3020,27 +3044,33 @@ func setupNetworkMapTest(t *testing.T) (*DefaultAccountManager, *types.Account, return manager, account, peer1, peer2, peer3 } -func peerShouldNotReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage) { +func peerShouldNotReceiveUpdate(t *testing.T, updateMessageBuffer *UpdateBuffer) { t.Helper() - select { - case msg := <-updateMessage: + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + msg, _, _, _ := updateMessageBuffer.Pop(ctx) + if msg != nil { t.Errorf("Unexpected message received: %+v", msg) - case <-time.After(500 * time.Millisecond): - return } + + return } -func peerShouldReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage) { +func peerShouldReceiveUpdate(t *testing.T, updateMessageBuffer *UpdateBuffer) { t.Helper() - select { - case msg := <-updateMessage: - if msg == nil { - t.Errorf("Received nil update message, expected valid message") - } - case <-time.After(500 * time.Millisecond): - t.Error("Timed out waiting for update message") + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + msg, _, _, err := updateMessageBuffer.Pop(ctx) + if err != nil { + t.Errorf("Expected update message, but none received") } + if msg == nil { + t.Errorf("Received nil update message, expected valid message") + } + + return } func BenchmarkSyncAndMarkPeer(b *testing.B) { @@ -3077,11 +3107,13 @@ func BenchmarkSyncAndMarkPeer(b *testing.B) { if err != nil { b.Fatalf("Failed to get account: %v", err) } - peerChannels := make(map[string]chan *UpdateMessage) + peerChannels := make(map[string]*UpdateChannel) for peerID := range account.Peers { - peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) + peerChannels[peerID] = &UpdateChannel{ + Important: make(chan *UpdateMessage, channelBufferSize), + NetworkMap: NewUpdateBuffer(manager.metrics.UpdateChannelMetrics()), + } } - manager.peersUpdateManager.peerChannels = peerChannels b.ResetTimer() start := time.Now() @@ -3140,9 +3172,12 @@ func BenchmarkLoginPeer_ExistingPeer(b *testing.B) { if err != nil { b.Fatalf("Failed to get account: %v", err) } - peerChannels := make(map[string]chan *UpdateMessage) + peerChannels := make(map[string]*UpdateChannel) for peerID := range account.Peers { - peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) + peerChannels[peerID] = &UpdateChannel{ + Important: make(chan *UpdateMessage, channelBufferSize), + NetworkMap: NewUpdateBuffer(manager.metrics.UpdateChannelMetrics()), + } } manager.peersUpdateManager.peerChannels = peerChannels @@ -3210,9 +3245,12 @@ func BenchmarkLoginPeer_NewPeer(b *testing.B) { if err != nil { b.Fatalf("Failed to get account: %v", err) } - peerChannels := make(map[string]chan *UpdateMessage) + peerChannels := make(map[string]*UpdateChannel) for peerID := range account.Peers { - peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) + peerChannels[peerID] = &UpdateChannel{ + Important: make(chan *UpdateMessage, channelBufferSize), + NetworkMap: NewUpdateBuffer(manager.metrics.UpdateChannelMetrics()), + } } manager.peersUpdateManager.peerChannels = peerChannels diff --git a/management/server/dns_test.go b/management/server/dns_test.go index 83caf74ef..3eb03702e 100644 --- a/management/server/dns_test.go +++ b/management/server/dns_test.go @@ -609,7 +609,7 @@ func TestDNSAccountPeersUpdate(t *testing.T) { t.Run("saving dns setting with unused groups", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -629,7 +629,7 @@ func TestDNSAccountPeersUpdate(t *testing.T) { t.Run("creating dns setting with unused groups", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -662,7 +662,7 @@ func TestDNSAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -688,7 +688,7 @@ func TestDNSAccountPeersUpdate(t *testing.T) { t.Run("saving dns setting with used groups", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -708,7 +708,7 @@ func TestDNSAccountPeersUpdate(t *testing.T) { t.Run("removing group with no peers from dns settings", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -728,7 +728,7 @@ func TestDNSAccountPeersUpdate(t *testing.T) { t.Run("removing group with peers from dns settings", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() diff --git a/management/server/group_test.go b/management/server/group_test.go index 31ff29cbc..e1f69acef 100644 --- a/management/server/group_test.go +++ b/management/server/group_test.go @@ -451,7 +451,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) { t.Run("saving unlinked group", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -474,7 +474,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) { t.Run("adding peer to unlinked group", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -493,7 +493,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) { t.Run("removing peer from unliked group", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -511,7 +511,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) { t.Run("deleting group", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -544,7 +544,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) { t.Run("saving linked group to policy", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -566,7 +566,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) { t.Run("adding peer to linked group", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -584,7 +584,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) { t.Run("removing peer from linked group", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -613,7 +613,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -654,7 +654,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -681,7 +681,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -728,7 +728,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() diff --git a/management/server/grpcserver.go b/management/server/grpcserver.go index 8c0b275e6..1529756e8 100644 --- a/management/server/grpcserver.go +++ b/management/server/grpcserver.go @@ -267,9 +267,9 @@ func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKe go func() { for { start := time.Now() - update, overwrites, timeSinceLastPop, ok := updates.NetworkMap.Pop(ctx) + update, overwrites, timeSinceLastPop, err := updates.NetworkMap.Pop(ctx) log.WithContext(ctx).Debugf("popped an update for peer %s from the network map buffer in %v (overwrites: %d)", peerKey.String(), time.Since(start), overwrites) - if !ok { + if err != nil { close(networkMapCh) return } diff --git a/management/server/nameserver_test.go b/management/server/nameserver_test.go index 6c985410c..3bb408af8 100644 --- a/management/server/nameserver_test.go +++ b/management/server/nameserver_test.go @@ -1004,7 +1004,7 @@ func TestNameServerAccountPeersUpdate(t *testing.T) { t.Run("creating nameserver group with distribution group no peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1031,7 +1031,7 @@ func TestNameServerAccountPeersUpdate(t *testing.T) { t.Run("saving nameserver group with distribution group no peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1049,7 +1049,7 @@ func TestNameServerAccountPeersUpdate(t *testing.T) { t.Run("creating nameserver group with distribution group has peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1075,7 +1075,7 @@ func TestNameServerAccountPeersUpdate(t *testing.T) { t.Run("saving nameserver group with distribution group has peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1105,7 +1105,7 @@ func TestNameServerAccountPeersUpdate(t *testing.T) { t.Run("deleting nameserver group", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() diff --git a/management/server/peer.go b/management/server/peer.go index 73aaef801..639ff8801 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -1343,9 +1343,11 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account } type bufferUpdate struct { - mu sync.Mutex - next *time.Timer - update atomic.Bool + mu sync.Mutex + next *time.Timer + update atomic.Bool + requestCount atomic.Int32 + lastResetTime atomic.Int64 // Unix timestamp in milliseconds } func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) { @@ -1356,6 +1358,7 @@ func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, a if !b.mu.TryLock() { b.update.Store(true) + b.requestCount.Add(1) return } @@ -1365,18 +1368,41 @@ func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, a go func() { defer b.mu.Unlock() + + start := time.Now() + + b.requestCount.Store(0) + am.UpdateAccountPeers(ctx, accountID) + if !b.update.Load() { return } b.update.Store(false) + + requestsDuringProcessing := b.requestCount.Load() + executionTime := time.Since(start) + + requestsPerMinute := float64(requestsDuringProcessing) / executionTime.Seconds() * 60 + + adaptiveDelay := time.Duration(requestsPerMinute) * 250 * time.Millisecond + + // cap the maximum delay to avoid too long delays + maxDelay := 30 * time.Second + if adaptiveDelay > maxDelay { + adaptiveDelay = maxDelay + } + + log.WithContext(ctx).Debugf("adaptive debounce for account %s: %d requests during %v execution, waiting %v", + accountID, requestsDuringProcessing, executionTime, adaptiveDelay) + if b.next == nil { - b.next = time.AfterFunc(time.Duration(am.updateAccountPeersBufferInterval.Load()), func() { + b.next = time.AfterFunc(adaptiveDelay, func() { am.UpdateAccountPeers(ctx, accountID) }) return } - b.next.Reset(time.Duration(am.updateAccountPeersBufferInterval.Load())) + b.next.Reset(adaptiveDelay) }() } diff --git a/management/server/peer_test.go b/management/server/peer_test.go index dde23eeea..a3fcd829e 100644 --- a/management/server/peer_test.go +++ b/management/server/peer_test.go @@ -26,7 +26,6 @@ import ( "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "github.com/netbirdio/netbird/management/internals/server/config" - "github.com/netbirdio/netbird/management/server/http/testing/testing_tools" "github.com/netbirdio/netbird/management/server/integrations/port_forwarding" "github.com/netbirdio/netbird/management/server/mock_server" "github.com/netbirdio/netbird/management/server/permissions" @@ -954,14 +953,15 @@ func BenchmarkUpdateAccountPeers(b *testing.B) { minMsPerOpCICD float64 maxMsPerOpCICD float64 }{ - {"Small", 50, 5, 90, 120, 90, 120}, - {"Medium", 500, 100, 110, 150, 120, 260}, - {"Large", 5000, 200, 800, 1700, 2500, 5000}, - {"Small single", 50, 10, 90, 120, 90, 120}, - {"Medium single", 500, 10, 110, 170, 120, 200}, - {"Large 5", 5000, 15, 1300, 2100, 4900, 7000}, - {"Extra Large", 2000, 2000, 1300, 2400, 3000, 6400}, + {"Small", 350, 5, 90, 120, 90, 120}, + // {"Medium", 500, 100, 110, 150, 120, 260}, + // {"Large", 5000, 2, 800, 1700, 2500, 5000}, + // {"Small single", 50, 10, 90, 120, 90, 120}, + // {"Medium single", 500, 10, 110, 170, 120, 200}, + // {"Large 5", 5000, 15, 1300, 2100, 4900, 7000}, + // {"Extra Large", 2000, 2000, 1300, 2400, 3000, 6400}, } + b.Setenv("NB_EXPERIMENT_NETWORK_MAP", "false") log.SetOutput(io.Discard) defer log.SetOutput(os.Stderr) @@ -980,38 +980,39 @@ func BenchmarkUpdateAccountPeers(b *testing.B) { b.Fatalf("Failed to get account: %v", err) } - peerChannels := make(map[string]chan *UpdateMessage) + peerChannels := make(map[string]*UpdateChannel) for peerID := range account.Peers { - peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) + peerChannels[peerID] = &UpdateChannel{ + Important: make(chan *UpdateMessage, channelBufferSize), + NetworkMap: NewUpdateBuffer(manager.metrics.UpdateChannelMetrics()), + } } manager.peersUpdateManager.peerChannels = peerChannels - b.ResetTimer() - start := time.Now() + for i := 0; i < 1000; i++ { + b.Logf("Run %d", i) + manager.UpdateAccountPeers(ctx, account.Id) + mm(b) + manager.Store.IncrementNetworkSerial(ctx, account.Id) + } for i := 0; i < b.N; i++ { manager.UpdateAccountPeers(ctx, account.Id) } - duration := time.Since(start) - msPerOp := float64(duration.Nanoseconds()) / float64(b.N) / 1e6 - b.ReportMetric(msPerOp, "ms/op") - - maxExpected := bc.maxMsPerOpLocal - if os.Getenv("CI") == "true" { - maxExpected = bc.maxMsPerOpCICD - testing_tools.EvaluateBenchmarkResults(b, bc.name, time.Since(start), "login", "newPeer") - } - - if msPerOp > maxExpected { - b.Logf("Benchmark %s: too slow (%.2f ms/op, max %.2f ms/op)", bc.name, msPerOp, maxExpected) - } }) } } +func mm(b *testing.B) { + var m runtime.MemStats + runtime.ReadMemStats(&m) + // runtime.GC() + b.Logf("Alloc: %v MB, TotalAlloc: %v MB, Sys: %v MB, NumGC: %v", m.Alloc/1024/1024, m.TotalAlloc/1024/1024, m.Sys/1024/1024, m.NumGC) +} + func TestUpdateAccountPeers_Experimental(t *testing.T) { t.Setenv(envNewNetworkMapBuilder, "true") testUpdateAccountPeers(t) @@ -1044,22 +1045,30 @@ func testUpdateAccountPeers(t *testing.T) { ctx := context.Background() + metrics, err := telemetry.NewDefaultAppMetrics(ctx) + if err != nil { + t.Fatalf("Failed to create metrics: %v", err) + } + account, err := manager.Store.GetAccount(ctx, accountID) if err != nil { t.Fatalf("Failed to get account: %v", err) } - peerChannels := make(map[string]chan *UpdateMessage) + peerChannels := make(map[string]*UpdateChannel) for peerID := range account.Peers { - peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) + peerChannels[peerID] = &UpdateChannel{ + Important: make(chan *UpdateMessage, channelBufferSize), + NetworkMap: NewUpdateBuffer(metrics.UpdateChannelMetrics()), + } } manager.peersUpdateManager.peerChannels = peerChannels manager.UpdateAccountPeers(ctx, account.Id) for _, channel := range peerChannels { - update := <-channel + update, _, _, _ := channel.NetworkMap.Pop(ctx) assert.Nil(t, update.Update.NetbirdConfig) assert.Equal(t, tc.peers, len(update.Update.NetworkMap.RemotePeers)) assert.Equal(t, tc.peers*2, len(update.Update.NetworkMap.FirewallRules)) @@ -1791,7 +1800,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) { t.Run("updating not expired peer and peer expiration is enabled", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1809,7 +1818,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) { t.Run("adding peer to unlinked group", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) // + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) // close(done) }() @@ -1834,7 +1843,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) { t.Run("deleting peer with unlinked group", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1852,7 +1861,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) { t.Run("updating peer label", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1875,7 +1884,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) { manager.integratedPeerValidator = MockIntegratedValidator{ValidatePeerFunc: requireUpdateFunc} done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1897,7 +1906,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) { manager.integratedPeerValidator = MockIntegratedValidator{ValidatePeerFunc: requireNoUpdateFunc} done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1930,7 +1939,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1956,7 +1965,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) { t.Run("deleting peer with linked group to policy", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1994,7 +2003,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -2020,7 +2029,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) { t.Run("deleting peer with linked group to route", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -2049,7 +2058,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -2075,7 +2084,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) { t.Run("deleting peer with linked group to route", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() diff --git a/management/server/policy_test.go b/management/server/policy_test.go index 4a08f4c33..e06408993 100644 --- a/management/server/policy_test.go +++ b/management/server/policy_test.go @@ -1034,7 +1034,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) { t.Run("saving policy with rule groups with no peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1065,7 +1065,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) { t.Run("saving policy where source has peers but destination does not", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1097,7 +1097,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) { t.Run("saving policy where destination has peers but source does not", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1129,7 +1129,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) { t.Run("saving policy with source and destination groups with peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1160,7 +1160,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) { t.Run("disabling policy with source and destination groups with peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1180,7 +1180,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) { t.Run("updating disabled policy with source and destination groups with peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1201,7 +1201,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) { t.Run("enabling policy with source and destination groups with peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1220,7 +1220,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) { t.Run("deleting policy with source and destination groups with peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1240,7 +1240,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) { t.Run("deleting policy where destination has peers but source does not", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1258,7 +1258,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) { t.Run("deleting policy with no peers in groups", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() diff --git a/management/server/posture_checks_test.go b/management/server/posture_checks_test.go index 67760d55a..103f0ab55 100644 --- a/management/server/posture_checks_test.go +++ b/management/server/posture_checks_test.go @@ -180,7 +180,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) { t.Run("saving unused posture check", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -198,7 +198,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) { t.Run("updating unused posture check", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -235,7 +235,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) { t.Run("linking posture check to policy with peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -264,7 +264,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -282,7 +282,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) { t.Run("removing posture check from policy", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -301,7 +301,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) { t.Run("deleting unused posture check", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -337,7 +337,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -381,7 +381,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg1) + peerShouldReceiveUpdate(t, updMsg1.NetworkMap) close(done) }() @@ -420,7 +420,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() diff --git a/management/server/route_test.go b/management/server/route_test.go index 388db140c..d8f878edf 100644 --- a/management/server/route_test.go +++ b/management/server/route_test.go @@ -1998,7 +1998,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -2034,7 +2034,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -2070,7 +2070,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) { t.Run("creating route with a routing peer", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -2095,7 +2095,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -2113,7 +2113,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) { t.Run("deleting route", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -2149,7 +2149,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -2189,7 +2189,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() diff --git a/management/server/setupkey_test.go b/management/server/setupkey_test.go index e55b33c94..3173639ca 100644 --- a/management/server/setupkey_test.go +++ b/management/server/setupkey_test.go @@ -431,7 +431,7 @@ func TestSetupKeyAccountPeersUpdate(t *testing.T) { t.Run("creating setup key", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -449,7 +449,7 @@ func TestSetupKeyAccountPeersUpdate(t *testing.T) { t.Run("saving setup key", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() diff --git a/management/server/token_mgr_test.go b/management/server/token_mgr_test.go index 5c956dc31..1088eefd1 100644 --- a/management/server/token_mgr_test.go +++ b/management/server/token_mgr_test.go @@ -121,7 +121,7 @@ func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) { loop: for timeout := time.After(5 * time.Second); ; { select { - case update := <-updateChannel: + case update := <-updateChannel.Important: updates = append(updates, update) case <-timeout: break loop diff --git a/management/server/update_buffer.go b/management/server/update_buffer.go index 26e5507c6..8ae170f97 100644 --- a/management/server/update_buffer.go +++ b/management/server/update_buffer.go @@ -2,6 +2,7 @@ package server import ( "context" + "fmt" "sync" "time" @@ -45,11 +46,17 @@ func (b *UpdateBuffer) Push(update *UpdateMessage) { b.metrics.CountBufferIgnore() } -func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, int, time.Duration, bool) { +func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, int, time.Duration, error) { b.mu.Lock() defer b.mu.Unlock() for b.update == nil && !b.closed { + select { + case <-ctx.Done(): + return nil, 0, 0, fmt.Errorf("context cancelled") + default: + } + waitCh := make(chan struct{}) go func() { select { @@ -61,10 +68,16 @@ func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, int, time.Durat }() b.cond.Wait() close(waitCh) + + select { + case <-ctx.Done(): + return nil, 0, 0, fmt.Errorf("context cancelled") + default: + } } if b.closed { - return nil, 0, 0, false + return nil, 0, 0, fmt.Errorf("buffer closed") } msg := b.update @@ -82,7 +95,7 @@ func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, int, time.Durat b.overwriteCount = 0 b.lastPopTime = now - return msg, overwrites, timeSinceLastPop, true + return msg, overwrites, timeSinceLastPop, nil } func (b *UpdateBuffer) Close() { diff --git a/management/server/user_test.go b/management/server/user_test.go index 5920a2a33..8f6d0d4eb 100644 --- a/management/server/user_test.go +++ b/management/server/user_test.go @@ -1366,7 +1366,7 @@ func TestUserAccountPeersUpdate(t *testing.T) { t.Run("creating new regular user with no groups", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1389,7 +1389,7 @@ func TestUserAccountPeersUpdate(t *testing.T) { t.Run("updating user with no linked peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1412,7 +1412,7 @@ func TestUserAccountPeersUpdate(t *testing.T) { t.Run("deleting user with no linked peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldNotReceiveUpdate(t, updMsg) + peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1449,7 +1449,7 @@ func TestUserAccountPeersUpdate(t *testing.T) { t.Run("updating user with linked peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, updMsg) + peerShouldReceiveUpdate(t, updMsg.NetworkMap) close(done) }() @@ -1477,7 +1477,7 @@ func TestUserAccountPeersUpdate(t *testing.T) { t.Run("deleting user with linked peers", func(t *testing.T) { done := make(chan struct{}) go func() { - peerShouldReceiveUpdate(t, peer4UpdMsg) + peerShouldReceiveUpdate(t, peer4UpdMsg.NetworkMap) close(done) }()