diff --git a/management/server/grpcserver.go b/management/server/grpcserver.go index 2c2d917ab..b639c6613 100644 --- a/management/server/grpcserver.go +++ b/management/server/grpcserver.go @@ -7,6 +7,7 @@ import ( "net" "net/netip" "os" + "strconv" "strings" "sync" "time" @@ -63,6 +64,7 @@ type GRPCServer struct { logBlockedPeers bool blockPeersWithSameConfig bool integratedPeerValidator integrated_validator.IntegratedValidator + debounce int } // NewServer creates a new Management server @@ -96,6 +98,14 @@ func NewServer( logBlockedPeers := strings.ToLower(os.Getenv(envLogBlockedPeers)) == "true" blockPeersWithSameConfig := strings.ToLower(os.Getenv(envBlockPeers)) == "true" + debounce := 1 + if decounceStr := os.Getenv("NB_UPDATE_BUFFER_DEBOUNCE_S"); decounceStr != "" { + debounce, err = strconv.Atoi(decounceStr) + if err != nil { + log.Errorf("invalid value for NB_UPDATE_BUFFER_DEBOUNCE_S: %v using 1 second", err) + } + } + return &GRPCServer{ wgKey: key, // peerKey -> event channel @@ -110,6 +120,7 @@ func NewServer( logBlockedPeers: logBlockedPeers, blockPeersWithSameConfig: blockPeersWithSameConfig, integratedPeerValidator: integratedPeerValidator, + debounce: debounce, }, nil } @@ -259,10 +270,12 @@ func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKe select { case networkMapCh <- update: log.WithContext(ctx).Debugf("forwarded an update for peer %s from the network map buffer in %v", peerKey.String(), time.Since(start)) + start = time.Now() + time.Sleep(time.Duration(s.debounce) * time.Second) + log.WithContext(ctx).Debugf("debounced for %v seconds for peer %s", time.Since(start), peerKey.String()) case <-ctx.Done(): return } - time.Sleep(1 * time.Second) } }()