From 9217df05ebdc013dc51c073f21aaf816b3abe906 Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Fri, 10 Oct 2025 10:44:23 +0200 Subject: [PATCH] add worker pool support --- signal/loadtest/README.md | 34 +++++++++++++++++++++ signal/loadtest/cmd/signal-loadtest/main.go | 6 ++++ signal/loadtest/rate_loadtest.go | 18 +++++++++-- 3 files changed, 56 insertions(+), 2 deletions(-) diff --git a/signal/loadtest/README.md b/signal/loadtest/README.md index 8c11356de..46027a5ba 100644 --- a/signal/loadtest/README.md +++ b/signal/loadtest/README.md @@ -69,6 +69,16 @@ go build -o signal-loadtest -insecure-skip-verify \ -log-level debug +# High load test with custom worker pool +./signal-loadtest \ + -server http://localhost:10000 \ + -pairs-per-sec 100 \ + -total-pairs 1000 \ + -worker-pool-size 500 \ + -channel-buffer-size 1000 \ + -exchange-duration 60s \ + -log-level info + # Show help ./signal-loadtest -h ``` @@ -81,6 +91,8 @@ go build -o signal-loadtest - `-test-duration`: Maximum test duration, 0 = unlimited (default: 0) - `-exchange-duration`: Continuous exchange duration per pair, 0 = single message (default: 0) - `-message-interval`: Interval between messages in continuous mode (default: 100ms) +- `-worker-pool-size`: Number of concurrent workers, 0 = auto (pairs-per-sec × 2) (default: 0) +- `-channel-buffer-size`: Work queue buffer size, 0 = auto (pairs-per-sec × 4) (default: 0) - `-insecure-skip-verify`: Skip TLS certificate verification for self-signed certificates (default: false) - `-log-level`: Log level: trace, debug, info, warn, error (default: info) @@ -173,9 +185,31 @@ func main() { - **TestDuration**: Maximum test duration (optional, 0 = no limit) - **ExchangeDuration**: Duration for continuous message exchange per pair (0 = single message) - **MessageInterval**: Interval between messages in continuous mode (default: 100ms) +- **WorkerPoolSize**: Number of concurrent worker goroutines (0 = auto: pairs-per-sec × 2) +- **ChannelBufferSize**: Work queue buffer size (0 = auto: pairs-per-sec × 4) - **InsecureSkipVerify**: Skip TLS certificate verification (for self-signed certificates) - **RampUpDuration**: Gradual ramp-up period (not yet implemented) +### Performance Tuning + +When running high-load tests, you may need to adjust the worker pool and buffer sizes: + +- **Default sizing**: Auto-configured based on `PairsPerSecond` + - Worker pool: `PairsPerSecond × 2` + - Channel buffer: `PairsPerSecond × 4` +- **For continuous exchange**: Increase worker pool size (e.g., `PairsPerSecond × 5`) +- **For high pair rates** (>50/sec): Increase both worker pool and buffer proportionally +- **Signs you need more workers**: Log warnings about "Worker pool saturated" + +Example for 100 pairs/sec with continuous exchange: +```go +config := LoadTestConfig{ + PairsPerSecond: 100, + WorkerPoolSize: 500, // 5x pairs/sec + ChannelBufferSize: 1000, // 10x pairs/sec +} +``` + ## Metrics The load test collects and reports: diff --git a/signal/loadtest/cmd/signal-loadtest/main.go b/signal/loadtest/cmd/signal-loadtest/main.go index 4afb44972..f7abf1346 100644 --- a/signal/loadtest/cmd/signal-loadtest/main.go +++ b/signal/loadtest/cmd/signal-loadtest/main.go @@ -20,6 +20,8 @@ var ( exchangeDuration time.Duration messageInterval time.Duration insecureSkipVerify bool + workerPoolSize int + channelBufferSize int logLevel string ) @@ -32,6 +34,8 @@ func init() { flag.DurationVar(&exchangeDuration, "exchange-duration", 0, "Duration for continuous message exchange per pair (0 = single message)") flag.DurationVar(&messageInterval, "message-interval", 100*time.Millisecond, "Interval between messages in continuous mode") flag.BoolVar(&insecureSkipVerify, "insecure-skip-verify", false, "Skip TLS certificate verification (use with self-signed certificates)") + flag.IntVar(&workerPoolSize, "worker-pool-size", 0, "Number of worker goroutines (0 = auto: pairs-per-sec * 2)") + flag.IntVar(&channelBufferSize, "channel-buffer-size", 0, "Channel buffer size (0 = auto: pairs-per-sec * 4)") flag.StringVar(&logLevel, "log-level", "info", "Log level (trace, debug, info, warn, error)") } @@ -54,6 +58,8 @@ func main() { ExchangeDuration: exchangeDuration, MessageInterval: messageInterval, InsecureSkipVerify: insecureSkipVerify, + WorkerPoolSize: workerPoolSize, + ChannelBufferSize: channelBufferSize, } if err := validateConfig(config); err != nil { diff --git a/signal/loadtest/rate_loadtest.go b/signal/loadtest/rate_loadtest.go index be21ca5d4..cd93646f2 100644 --- a/signal/loadtest/rate_loadtest.go +++ b/signal/loadtest/rate_loadtest.go @@ -21,6 +21,8 @@ type LoadTestConfig struct { MessageInterval time.Duration RampUpDuration time.Duration InsecureSkipVerify bool + WorkerPoolSize int + ChannelBufferSize int } // LoadTestMetrics metrics collected during the load test @@ -74,13 +76,25 @@ func (lt *LoadTest) Run() error { if lt.config.ExchangeDuration > 0 { exchangeInfo = fmt.Sprintf("continuous for %v", lt.config.ExchangeDuration) } + + workerPoolSize := lt.config.WorkerPoolSize + if workerPoolSize == 0 { + workerPoolSize = lt.config.PairsPerSecond * 2 + } + + channelBufferSize := lt.config.ChannelBufferSize + if channelBufferSize == 0 { + channelBufferSize = lt.config.PairsPerSecond * 4 + } + log.Infof("Starting load test: %d pairs/sec, %d total pairs, message size: %d bytes, exchange: %s", lt.config.PairsPerSecond, lt.config.TotalPairs, lt.config.MessageSize, exchangeInfo) + log.Infof("Worker pool size: %d, channel buffer: %d", workerPoolSize, channelBufferSize) var wg sync.WaitGroup - pairChan := make(chan int, lt.config.PairsPerSecond) + pairChan := make(chan int, channelBufferSize) - for i := 0; i < lt.config.PairsPerSecond; i++ { + for i := 0; i < workerPoolSize; i++ { wg.Add(1) go lt.pairWorker(&wg, pairChan) }