simple balancing

This commit is contained in:
crn4
2025-10-17 16:06:53 +02:00
parent 1793ad8ff0
commit 31e3928e08

View File

@@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
pb "github.com/golang/protobuf/proto" // nolint
@@ -45,6 +46,9 @@ import (
const (
envLogBlockedPeers = "NB_LOG_BLOCKED_PEERS"
envBlockPeers = "NB_BLOCK_SAME_PEERS"
envConcurrentSyncs = "NB_MAX_CONCURRENT_SYNCS"
defaultSyncLim = 1000
)
// GRPCServer an instance of a Management gRPC API server
@@ -65,6 +69,9 @@ type GRPCServer struct {
blockPeersWithSameConfig bool
integratedPeerValidator integrated_validator.IntegratedValidator
debounce int
syncSem atomic.Int32
syncLim int32
}
// NewServer creates a new Management server
@@ -106,6 +113,16 @@ func NewServer(
}
}
syncLim := int32(defaultSyncLim)
if syncLimStr := os.Getenv(envConcurrentSyncs); syncLimStr != "" {
syncLimParsed, err := strconv.Atoi(syncLimStr)
if err != nil {
log.Errorf("invalid value for %s: %v using %d", envConcurrentSyncs, err, defaultSyncLim)
} else {
syncLim = int32(syncLimParsed)
}
}
return &GRPCServer{
wgKey: key,
// peerKey -> event channel
@@ -121,6 +138,8 @@ func NewServer(
blockPeersWithSameConfig: blockPeersWithSameConfig,
integratedPeerValidator: integratedPeerValidator,
debounce: debounce,
syncLim: syncLim,
}, nil
}
@@ -162,6 +181,12 @@ func getRealIP(ctx context.Context) net.IP {
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
// notifies the connected peer of any updates (e.g. new peers under the same account)
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
if s.syncSem.Load() >= s.syncLim {
return status.Errorf(codes.ResourceExhausted, "too many concurrent sync requests, please try again later")
}
s.syncSem.Add(1)
defer s.syncSem.Add(-1)
reqStart := time.Now()
ctx := srv.Context()