diff --git a/management/server/grpcserver.go b/management/server/grpcserver.go index 1529756e8..caacef6d6 100644 --- a/management/server/grpcserver.go +++ b/management/server/grpcserver.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" pb "github.com/golang/protobuf/proto" // nolint @@ -45,6 +46,9 @@ import ( const ( envLogBlockedPeers = "NB_LOG_BLOCKED_PEERS" envBlockPeers = "NB_BLOCK_SAME_PEERS" + envConcurrentSyncs = "NB_MAX_CONCURRENT_SYNCS" + + defaultSyncLim = 1000 ) // GRPCServer an instance of a Management gRPC API server @@ -65,6 +69,9 @@ type GRPCServer struct { blockPeersWithSameConfig bool integratedPeerValidator integrated_validator.IntegratedValidator debounce int + + syncSem atomic.Int32 + syncLim int32 } // NewServer creates a new Management server @@ -106,6 +113,16 @@ func NewServer( } } + syncLim := int32(defaultSyncLim) + if syncLimStr := os.Getenv(envConcurrentSyncs); syncLimStr != "" { + syncLimParsed, err := strconv.Atoi(syncLimStr) + if err != nil { + log.Errorf("invalid value for %s: %v using %d", envConcurrentSyncs, err, defaultSyncLim) + } else { + syncLim = int32(syncLimParsed) + } + } + return &GRPCServer{ wgKey: key, // peerKey -> event channel @@ -121,6 +138,8 @@ func NewServer( blockPeersWithSameConfig: blockPeersWithSameConfig, integratedPeerValidator: integratedPeerValidator, debounce: debounce, + + syncLim: syncLim, }, nil } @@ -162,6 +181,12 @@ func getRealIP(ctx context.Context) net.IP { // Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and // notifies the connected peer of any updates (e.g. new peers under the same account) func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error { + if s.syncSem.Load() >= s.syncLim { + return status.Errorf(codes.ResourceExhausted, "too many concurrent sync requests, please try again later") + } + s.syncSem.Add(1) + defer s.syncSem.Add(-1) + reqStart := time.Now() ctx := srv.Context()