[management] Feature/grpc debounce msgtype (#5239)

* Add gRPC update debouncing mechanism

Implements backpressure handling for peer network map updates to
efficiently handle rapid changes. First update is sent immediately,
subsequent rapid updates are coalesced, ensuring only the latest
update is sent after a 1-second quiet period.

* Enhance unit test to verify peer count synchronization with debouncing and timeout handling

* Debounce based on type

* Refactor test to validate timer restart after pending update dispatch

* Simplify timer reset for Go 1.23+ automatic channel draining

Remove manual channel drain in resetTimer() since Go 1.23+ automatically
drains the timer channel when Stop() returns false, making the
select-case pattern unnecessary.
This commit is contained in:
Zoltan Papp
2026-02-06 19:47:38 +01:00
committed by GitHub
parent af8f730bda
commit 3be16d19a0
8 changed files with 818 additions and 22 deletions

View File

@@ -610,6 +610,7 @@ func TestSync10PeersGetUpdates(t *testing.T) {
initialPeers := 10
additionalPeers := 10
expectedPeerCount := initialPeers + additionalPeers - 1 // -1 because peer doesn't see itself
var peers []wgtypes.Key
for i := 0; i < initialPeers; i++ {
@@ -618,8 +619,19 @@ func TestSync10PeersGetUpdates(t *testing.T) {
peers = append(peers, key)
}
// Track the maximum peer count each peer has seen
type peerState struct {
mu sync.Mutex
maxPeerCount int
done bool
}
peerStates := make(map[string]*peerState)
for _, pk := range peers {
peerStates[pk.PublicKey().String()] = &peerState{}
}
var wg sync.WaitGroup
wg.Add(initialPeers + initialPeers*additionalPeers)
wg.Add(initialPeers) // One completion per initial peer
var syncClients []mgmtProto.ManagementService_SyncClient
for _, pk := range peers {
@@ -643,6 +655,9 @@ func TestSync10PeersGetUpdates(t *testing.T) {
syncClients = append(syncClients, s)
go func(pk wgtypes.Key, syncStream mgmtProto.ManagementService_SyncClient) {
pubKey := pk.PublicKey().String()
state := peerStates[pubKey]
for {
encMsg := &mgmtProto.EncryptedMessage{}
err := syncStream.RecvMsg(encMsg)
@@ -651,19 +666,28 @@ func TestSync10PeersGetUpdates(t *testing.T) {
}
decryptedBytes, decErr := encryption.Decrypt(encMsg.Body, ts.serverPubKey, pk)
if decErr != nil {
t.Errorf("failed to decrypt SyncResponse for peer %s: %v", pk.PublicKey().String(), decErr)
t.Errorf("failed to decrypt SyncResponse for peer %s: %v", pubKey, decErr)
return
}
resp := &mgmtProto.SyncResponse{}
umErr := pb.Unmarshal(decryptedBytes, resp)
if umErr != nil {
t.Errorf("failed to unmarshal SyncResponse for peer %s: %v", pk.PublicKey().String(), umErr)
t.Errorf("failed to unmarshal SyncResponse for peer %s: %v", pubKey, umErr)
return
}
// We only count if there's a new peer update
if len(resp.GetRemotePeers()) > 0 {
// Track the maximum peer count seen (due to debouncing, updates are coalesced)
peerCount := len(resp.GetRemotePeers())
state.mu.Lock()
if peerCount > state.maxPeerCount {
state.maxPeerCount = peerCount
}
// Signal completion when this peer has seen all expected peers
if !state.done && state.maxPeerCount >= expectedPeerCount {
state.done = true
wg.Done()
}
state.mu.Unlock()
}
}(pk, s)
}
@@ -677,7 +701,30 @@ func TestSync10PeersGetUpdates(t *testing.T) {
time.Sleep(time.Duration(n) * time.Millisecond)
}
wg.Wait()
// Wait for debouncer to flush final updates (debounce interval is 1000ms)
time.Sleep(1500 * time.Millisecond)
// Wait with timeout
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
// Success - all peers received expected peer count
case <-time.After(5 * time.Second):
// Timeout - report which peers didn't receive all updates
t.Error("Timeout waiting for all peers to receive updates")
for pubKey, state := range peerStates {
state.mu.Lock()
if state.maxPeerCount < expectedPeerCount {
t.Errorf("Peer %s only saw %d peers, expected %d", pubKey, state.maxPeerCount, expectedPeerCount)
}
state.mu.Unlock()
}
}
for _, sc := range syncClients {
err := sc.CloseSend()