use buffer and adaptive debounce for account peers update

This commit is contained in:
Pascal Fischer
2025-10-09 16:12:09 +02:00
parent c115434d53
commit 6ac9e58911
14 changed files with 224 additions and 138 deletions

View File

@@ -1190,7 +1190,7 @@ func testAccountManager_NetworkUpdates_SaveGroup(t *testing.T) {
}, true)
require.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
updChan := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
defer manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
wg := sync.WaitGroup{}
@@ -1198,7 +1198,12 @@ func testAccountManager_NetworkUpdates_SaveGroup(t *testing.T) {
go func() {
defer wg.Done()
message := <-updMsg
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
message, _, _, err := updChan.NetworkMap.Pop(ctx)
if err != nil {
t.Errorf("timeout waiting for network update")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 2 {
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
@@ -1229,12 +1234,11 @@ func testAccountManager_NetworkUpdates_DeletePolicy(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
defer manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
// Ensure that we do not receive an update message before the policy is deleted
time.Sleep(time.Second)
select {
case <-updMsg:
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, _, _, err := updMsg.NetworkMap.Pop(ctx)
if err != nil {
t.Logf("received addPeer update message before policy deletion")
default:
}
wg := sync.WaitGroup{}
@@ -1242,7 +1246,12 @@ func testAccountManager_NetworkUpdates_DeletePolicy(t *testing.T) {
go func() {
defer wg.Done()
message := <-updMsg
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
message, _, _, err := updMsg.NetworkMap.Pop(ctx)
if err != nil {
t.Errorf("timeout waiting for network update")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
@@ -1288,7 +1297,12 @@ func testAccountManager_NetworkUpdates_SavePolicy(t *testing.T) {
go func() {
defer wg.Done()
message := <-updMsg
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
message, _, _, err := updMsg.NetworkMap.Pop(ctx)
if err != nil {
t.Errorf("timeout waiting for network update")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 2 {
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
@@ -1362,7 +1376,12 @@ func testAccountManager_NetworkUpdates_DeletePeer(t *testing.T) {
go func() {
defer wg.Done()
message := <-updMsg
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
message, _, _, err := updMsg.NetworkMap.Pop(ctx)
if err != nil {
t.Errorf("timeout waiting for network update")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 1 {
t.Errorf("mismatch peers count: 1 expected, got %v", len(networkMap.RemotePeers))
@@ -1427,7 +1446,12 @@ func testAccountManager_NetworkUpdates_DeleteGroup(t *testing.T) {
go func() {
defer wg.Done()
message := <-updMsg
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
message, _, _, err := updMsg.NetworkMap.Pop(ctx)
if err != nil {
t.Errorf("timeout waiting for network update")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
@@ -2939,7 +2963,7 @@ func createManager(t testing.TB) (*DefaultAccountManager, error) {
permissionsManager := permissions.NewManager(store)
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
if err != nil {
return nil, err
}
@@ -3020,27 +3044,33 @@ func setupNetworkMapTest(t *testing.T) (*DefaultAccountManager, *types.Account,
return manager, account, peer1, peer2, peer3
}
func peerShouldNotReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage) {
func peerShouldNotReceiveUpdate(t *testing.T, updateMessageBuffer *UpdateBuffer) {
t.Helper()
select {
case msg := <-updateMessage:
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
msg, _, _, _ := updateMessageBuffer.Pop(ctx)
if msg != nil {
t.Errorf("Unexpected message received: %+v", msg)
case <-time.After(500 * time.Millisecond):
return
}
return
}
func peerShouldReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage) {
func peerShouldReceiveUpdate(t *testing.T, updateMessageBuffer *UpdateBuffer) {
t.Helper()
select {
case msg := <-updateMessage:
if msg == nil {
t.Errorf("Received nil update message, expected valid message")
}
case <-time.After(500 * time.Millisecond):
t.Error("Timed out waiting for update message")
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
msg, _, _, err := updateMessageBuffer.Pop(ctx)
if err != nil {
t.Errorf("Expected update message, but none received")
}
if msg == nil {
t.Errorf("Received nil update message, expected valid message")
}
return
}
func BenchmarkSyncAndMarkPeer(b *testing.B) {
@@ -3077,11 +3107,13 @@ func BenchmarkSyncAndMarkPeer(b *testing.B) {
if err != nil {
b.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateChannel)
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = &UpdateChannel{
Important: make(chan *UpdateMessage, channelBufferSize),
NetworkMap: NewUpdateBuffer(manager.metrics.UpdateChannelMetrics()),
}
}
manager.peersUpdateManager.peerChannels = peerChannels
b.ResetTimer()
start := time.Now()
@@ -3140,9 +3172,12 @@ func BenchmarkLoginPeer_ExistingPeer(b *testing.B) {
if err != nil {
b.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateChannel)
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = &UpdateChannel{
Important: make(chan *UpdateMessage, channelBufferSize),
NetworkMap: NewUpdateBuffer(manager.metrics.UpdateChannelMetrics()),
}
}
manager.peersUpdateManager.peerChannels = peerChannels
@@ -3210,9 +3245,12 @@ func BenchmarkLoginPeer_NewPeer(b *testing.B) {
if err != nil {
b.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateChannel)
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = &UpdateChannel{
Important: make(chan *UpdateMessage, channelBufferSize),
NetworkMap: NewUpdateBuffer(manager.metrics.UpdateChannelMetrics()),
}
}
manager.peersUpdateManager.peerChannels = peerChannels

View File

@@ -609,7 +609,7 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
t.Run("saving dns setting with unused groups", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -629,7 +629,7 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
t.Run("creating dns setting with unused groups", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -662,7 +662,7 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -688,7 +688,7 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
t.Run("saving dns setting with used groups", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -708,7 +708,7 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
t.Run("removing group with no peers from dns settings", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -728,7 +728,7 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
t.Run("removing group with peers from dns settings", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()

View File

@@ -451,7 +451,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
t.Run("saving unlinked group", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -474,7 +474,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
t.Run("adding peer to unlinked group", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -493,7 +493,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
t.Run("removing peer from unliked group", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -511,7 +511,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
t.Run("deleting group", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -544,7 +544,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
t.Run("saving linked group to policy", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -566,7 +566,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
t.Run("adding peer to linked group", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -584,7 +584,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
t.Run("removing peer from linked group", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -613,7 +613,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -654,7 +654,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -681,7 +681,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -728,7 +728,7 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()

View File

@@ -267,9 +267,9 @@ func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKe
go func() {
for {
start := time.Now()
update, overwrites, timeSinceLastPop, ok := updates.NetworkMap.Pop(ctx)
update, overwrites, timeSinceLastPop, err := updates.NetworkMap.Pop(ctx)
log.WithContext(ctx).Debugf("popped an update for peer %s from the network map buffer in %v (overwrites: %d)", peerKey.String(), time.Since(start), overwrites)
if !ok {
if err != nil {
close(networkMapCh)
return
}

View File

@@ -1004,7 +1004,7 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
t.Run("creating nameserver group with distribution group no peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1031,7 +1031,7 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
t.Run("saving nameserver group with distribution group no peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1049,7 +1049,7 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
t.Run("creating nameserver group with distribution group has peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1075,7 +1075,7 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
t.Run("saving nameserver group with distribution group has peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1105,7 +1105,7 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
t.Run("deleting nameserver group", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()

View File

@@ -1343,9 +1343,11 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
}
type bufferUpdate struct {
mu sync.Mutex
next *time.Timer
update atomic.Bool
mu sync.Mutex
next *time.Timer
update atomic.Bool
requestCount atomic.Int32
lastResetTime atomic.Int64 // Unix timestamp in milliseconds
}
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
@@ -1356,6 +1358,7 @@ func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, a
if !b.mu.TryLock() {
b.update.Store(true)
b.requestCount.Add(1)
return
}
@@ -1365,18 +1368,41 @@ func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, a
go func() {
defer b.mu.Unlock()
start := time.Now()
b.requestCount.Store(0)
am.UpdateAccountPeers(ctx, accountID)
if !b.update.Load() {
return
}
b.update.Store(false)
requestsDuringProcessing := b.requestCount.Load()
executionTime := time.Since(start)
requestsPerMinute := float64(requestsDuringProcessing) / executionTime.Seconds() * 60
adaptiveDelay := time.Duration(requestsPerMinute) * 250 * time.Millisecond
// cap the maximum delay to avoid too long delays
maxDelay := 30 * time.Second
if adaptiveDelay > maxDelay {
adaptiveDelay = maxDelay
}
log.WithContext(ctx).Debugf("adaptive debounce for account %s: %d requests during %v execution, waiting %v",
accountID, requestsDuringProcessing, executionTime, adaptiveDelay)
if b.next == nil {
b.next = time.AfterFunc(time.Duration(am.updateAccountPeersBufferInterval.Load()), func() {
b.next = time.AfterFunc(adaptiveDelay, func() {
am.UpdateAccountPeers(ctx, accountID)
})
return
}
b.next.Reset(time.Duration(am.updateAccountPeersBufferInterval.Load()))
b.next.Reset(adaptiveDelay)
}()
}

View File

@@ -26,7 +26,6 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/management/internals/server/config"
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools"
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
"github.com/netbirdio/netbird/management/server/mock_server"
"github.com/netbirdio/netbird/management/server/permissions"
@@ -954,14 +953,15 @@ func BenchmarkUpdateAccountPeers(b *testing.B) {
minMsPerOpCICD float64
maxMsPerOpCICD float64
}{
{"Small", 50, 5, 90, 120, 90, 120},
{"Medium", 500, 100, 110, 150, 120, 260},
{"Large", 5000, 200, 800, 1700, 2500, 5000},
{"Small single", 50, 10, 90, 120, 90, 120},
{"Medium single", 500, 10, 110, 170, 120, 200},
{"Large 5", 5000, 15, 1300, 2100, 4900, 7000},
{"Extra Large", 2000, 2000, 1300, 2400, 3000, 6400},
{"Small", 350, 5, 90, 120, 90, 120},
// {"Medium", 500, 100, 110, 150, 120, 260},
// {"Large", 5000, 2, 800, 1700, 2500, 5000},
// {"Small single", 50, 10, 90, 120, 90, 120},
// {"Medium single", 500, 10, 110, 170, 120, 200},
// {"Large 5", 5000, 15, 1300, 2100, 4900, 7000},
// {"Extra Large", 2000, 2000, 1300, 2400, 3000, 6400},
}
b.Setenv("NB_EXPERIMENT_NETWORK_MAP", "false")
log.SetOutput(io.Discard)
defer log.SetOutput(os.Stderr)
@@ -980,38 +980,39 @@ func BenchmarkUpdateAccountPeers(b *testing.B) {
b.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateChannel)
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = &UpdateChannel{
Important: make(chan *UpdateMessage, channelBufferSize),
NetworkMap: NewUpdateBuffer(manager.metrics.UpdateChannelMetrics()),
}
}
manager.peersUpdateManager.peerChannels = peerChannels
b.ResetTimer()
start := time.Now()
for i := 0; i < 1000; i++ {
b.Logf("Run %d", i)
manager.UpdateAccountPeers(ctx, account.Id)
mm(b)
manager.Store.IncrementNetworkSerial(ctx, account.Id)
}
for i := 0; i < b.N; i++ {
manager.UpdateAccountPeers(ctx, account.Id)
}
duration := time.Since(start)
msPerOp := float64(duration.Nanoseconds()) / float64(b.N) / 1e6
b.ReportMetric(msPerOp, "ms/op")
maxExpected := bc.maxMsPerOpLocal
if os.Getenv("CI") == "true" {
maxExpected = bc.maxMsPerOpCICD
testing_tools.EvaluateBenchmarkResults(b, bc.name, time.Since(start), "login", "newPeer")
}
if msPerOp > maxExpected {
b.Logf("Benchmark %s: too slow (%.2f ms/op, max %.2f ms/op)", bc.name, msPerOp, maxExpected)
}
})
}
}
func mm(b *testing.B) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
// runtime.GC()
b.Logf("Alloc: %v MB, TotalAlloc: %v MB, Sys: %v MB, NumGC: %v", m.Alloc/1024/1024, m.TotalAlloc/1024/1024, m.Sys/1024/1024, m.NumGC)
}
func TestUpdateAccountPeers_Experimental(t *testing.T) {
t.Setenv(envNewNetworkMapBuilder, "true")
testUpdateAccountPeers(t)
@@ -1044,22 +1045,30 @@ func testUpdateAccountPeers(t *testing.T) {
ctx := context.Background()
metrics, err := telemetry.NewDefaultAppMetrics(ctx)
if err != nil {
t.Fatalf("Failed to create metrics: %v", err)
}
account, err := manager.Store.GetAccount(ctx, accountID)
if err != nil {
t.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateChannel)
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = &UpdateChannel{
Important: make(chan *UpdateMessage, channelBufferSize),
NetworkMap: NewUpdateBuffer(metrics.UpdateChannelMetrics()),
}
}
manager.peersUpdateManager.peerChannels = peerChannels
manager.UpdateAccountPeers(ctx, account.Id)
for _, channel := range peerChannels {
update := <-channel
update, _, _, _ := channel.NetworkMap.Pop(ctx)
assert.Nil(t, update.Update.NetbirdConfig)
assert.Equal(t, tc.peers, len(update.Update.NetworkMap.RemotePeers))
assert.Equal(t, tc.peers*2, len(update.Update.NetworkMap.FirewallRules))
@@ -1791,7 +1800,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
t.Run("updating not expired peer and peer expiration is enabled", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1809,7 +1818,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
t.Run("adding peer to unlinked group", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg) //
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap) //
close(done)
}()
@@ -1834,7 +1843,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
t.Run("deleting peer with unlinked group", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1852,7 +1861,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
t.Run("updating peer label", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1875,7 +1884,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
manager.integratedPeerValidator = MockIntegratedValidator{ValidatePeerFunc: requireUpdateFunc}
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1897,7 +1906,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
manager.integratedPeerValidator = MockIntegratedValidator{ValidatePeerFunc: requireNoUpdateFunc}
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1930,7 +1939,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1956,7 +1965,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
t.Run("deleting peer with linked group to policy", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1994,7 +2003,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -2020,7 +2029,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
t.Run("deleting peer with linked group to route", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -2049,7 +2058,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -2075,7 +2084,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
t.Run("deleting peer with linked group to route", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()

View File

@@ -1034,7 +1034,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
t.Run("saving policy with rule groups with no peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1065,7 +1065,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
t.Run("saving policy where source has peers but destination does not", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1097,7 +1097,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
t.Run("saving policy where destination has peers but source does not", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1129,7 +1129,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
t.Run("saving policy with source and destination groups with peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1160,7 +1160,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
t.Run("disabling policy with source and destination groups with peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1180,7 +1180,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
t.Run("updating disabled policy with source and destination groups with peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1201,7 +1201,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
t.Run("enabling policy with source and destination groups with peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1220,7 +1220,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
t.Run("deleting policy with source and destination groups with peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1240,7 +1240,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
t.Run("deleting policy where destination has peers but source does not", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1258,7 +1258,7 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
t.Run("deleting policy with no peers in groups", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()

View File

@@ -180,7 +180,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
t.Run("saving unused posture check", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -198,7 +198,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
t.Run("updating unused posture check", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -235,7 +235,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
t.Run("linking posture check to policy with peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -264,7 +264,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -282,7 +282,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
t.Run("removing posture check from policy", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -301,7 +301,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
t.Run("deleting unused posture check", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -337,7 +337,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -381,7 +381,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg1)
peerShouldReceiveUpdate(t, updMsg1.NetworkMap)
close(done)
}()
@@ -420,7 +420,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()

View File

@@ -1998,7 +1998,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -2034,7 +2034,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -2070,7 +2070,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
t.Run("creating route with a routing peer", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -2095,7 +2095,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -2113,7 +2113,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
t.Run("deleting route", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -2149,7 +2149,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -2189,7 +2189,7 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()

View File

@@ -431,7 +431,7 @@ func TestSetupKeyAccountPeersUpdate(t *testing.T) {
t.Run("creating setup key", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -449,7 +449,7 @@ func TestSetupKeyAccountPeersUpdate(t *testing.T) {
t.Run("saving setup key", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()

View File

@@ -121,7 +121,7 @@ func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) {
loop:
for timeout := time.After(5 * time.Second); ; {
select {
case update := <-updateChannel:
case update := <-updateChannel.Important:
updates = append(updates, update)
case <-timeout:
break loop

View File

@@ -2,6 +2,7 @@ package server
import (
"context"
"fmt"
"sync"
"time"
@@ -45,11 +46,17 @@ func (b *UpdateBuffer) Push(update *UpdateMessage) {
b.metrics.CountBufferIgnore()
}
func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, int, time.Duration, bool) {
func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, int, time.Duration, error) {
b.mu.Lock()
defer b.mu.Unlock()
for b.update == nil && !b.closed {
select {
case <-ctx.Done():
return nil, 0, 0, fmt.Errorf("context cancelled")
default:
}
waitCh := make(chan struct{})
go func() {
select {
@@ -61,10 +68,16 @@ func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, int, time.Durat
}()
b.cond.Wait()
close(waitCh)
select {
case <-ctx.Done():
return nil, 0, 0, fmt.Errorf("context cancelled")
default:
}
}
if b.closed {
return nil, 0, 0, false
return nil, 0, 0, fmt.Errorf("buffer closed")
}
msg := b.update
@@ -82,7 +95,7 @@ func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, int, time.Durat
b.overwriteCount = 0
b.lastPopTime = now
return msg, overwrites, timeSinceLastPop, true
return msg, overwrites, timeSinceLastPop, nil
}
func (b *UpdateBuffer) Close() {

View File

@@ -1366,7 +1366,7 @@ func TestUserAccountPeersUpdate(t *testing.T) {
t.Run("creating new regular user with no groups", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1389,7 +1389,7 @@ func TestUserAccountPeersUpdate(t *testing.T) {
t.Run("updating user with no linked peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1412,7 +1412,7 @@ func TestUserAccountPeersUpdate(t *testing.T) {
t.Run("deleting user with no linked peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
peerShouldNotReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1449,7 +1449,7 @@ func TestUserAccountPeersUpdate(t *testing.T) {
t.Run("updating user with linked peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
peerShouldReceiveUpdate(t, updMsg.NetworkMap)
close(done)
}()
@@ -1477,7 +1477,7 @@ func TestUserAccountPeersUpdate(t *testing.T) {
t.Run("deleting user with linked peers", func(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, peer4UpdMsg)
peerShouldReceiveUpdate(t, peer4UpdMsg.NetworkMap)
close(done)
}()