diff --git a/signal/loadtest/README.md b/signal/loadtest/README.md index 46027a5ba..f38355734 100644 --- a/signal/loadtest/README.md +++ b/signal/loadtest/README.md @@ -79,6 +79,15 @@ go build -o signal-loadtest -exchange-duration 60s \ -log-level info +# Progress reporting - report every 5000 messages +./signal-loadtest \ + -server http://localhost:10000 \ + -pairs-per-sec 50 \ + -total-pairs 100 \ + -exchange-duration 5m \ + -report-interval 5000 \ + -log-level info + # Show help ./signal-loadtest -h ``` @@ -93,6 +102,7 @@ go build -o signal-loadtest - `-message-interval`: Interval between messages in continuous mode (default: 100ms) - `-worker-pool-size`: Number of concurrent workers, 0 = auto (pairs-per-sec × 2) (default: 0) - `-channel-buffer-size`: Work queue buffer size, 0 = auto (pairs-per-sec × 4) (default: 0) +- `-report-interval`: Report progress every N messages, 0 = no periodic reports (default: 10000) - `-insecure-skip-verify`: Skip TLS certificate verification for self-signed certificates (default: false) - `-log-level`: Log level: trace, debug, info, warn, error (default: info) @@ -187,6 +197,7 @@ func main() { - **MessageInterval**: Interval between messages in continuous mode (default: 100ms) - **WorkerPoolSize**: Number of concurrent worker goroutines (0 = auto: pairs-per-sec × 2) - **ChannelBufferSize**: Work queue buffer size (0 = auto: pairs-per-sec × 4) +- **ReportInterval**: Report progress every N messages (0 = no periodic reports, default: 10000) - **InsecureSkipVerify**: Skip TLS certificate verification (for self-signed certificates) - **RampUpDuration**: Gradual ramp-up period (not yet implemented) diff --git a/signal/loadtest/cmd/signal-loadtest/main.go b/signal/loadtest/cmd/signal-loadtest/main.go index f7abf1346..6edbe607e 100644 --- a/signal/loadtest/cmd/signal-loadtest/main.go +++ b/signal/loadtest/cmd/signal-loadtest/main.go @@ -22,6 +22,7 @@ var ( insecureSkipVerify bool workerPoolSize int channelBufferSize int + reportInterval int logLevel string ) @@ -36,6 +37,7 @@ func init() { flag.BoolVar(&insecureSkipVerify, "insecure-skip-verify", false, "Skip TLS certificate verification (use with self-signed certificates)") flag.IntVar(&workerPoolSize, "worker-pool-size", 0, "Number of worker goroutines (0 = auto: pairs-per-sec * 2)") flag.IntVar(&channelBufferSize, "channel-buffer-size", 0, "Channel buffer size (0 = auto: pairs-per-sec * 4)") + flag.IntVar(&reportInterval, "report-interval", 10000, "Report progress every N messages (0 = no periodic reports)") flag.StringVar(&logLevel, "log-level", "info", "Log level (trace, debug, info, warn, error)") } @@ -60,6 +62,7 @@ func main() { InsecureSkipVerify: insecureSkipVerify, WorkerPoolSize: workerPoolSize, ChannelBufferSize: channelBufferSize, + ReportInterval: reportInterval, } if err := validateConfig(config); err != nil { diff --git a/signal/loadtest/cmd/signal-loadtest/test.sh b/signal/loadtest/cmd/signal-loadtest/test.sh index 947047580..9a9718846 100644 --- a/signal/loadtest/cmd/signal-loadtest/test.sh +++ b/signal/loadtest/cmd/signal-loadtest/test.sh @@ -24,5 +24,17 @@ echo "=== Test 2: Continuous exchange (3 pairs, 5 seconds) ===" -message-interval 200ms \ -log-level info +echo "" +echo "=== Test 3: Progress reporting (10 pairs, 10s, report every 100 messages) ===" +./signal-loadtest \ + -server http://localhost:10000 \ + -pairs-per-sec 10 \ + -total-pairs 10 \ + -message-size 100 \ + -exchange-duration 10s \ + -message-interval 100ms \ + -report-interval 100 \ + -log-level info + echo "" echo "All tests completed!" diff --git a/signal/loadtest/rate_loadtest.go b/signal/loadtest/rate_loadtest.go index cd93646f2..2c05f2efb 100644 --- a/signal/loadtest/rate_loadtest.go +++ b/signal/loadtest/rate_loadtest.go @@ -23,6 +23,7 @@ type LoadTestConfig struct { InsecureSkipVerify bool WorkerPoolSize int ChannelBufferSize int + ReportInterval int // Report progress every N messages (0 = no periodic reports) } // LoadTestMetrics metrics collected during the load test @@ -48,20 +49,25 @@ type PeerPair struct { // LoadTest manages the load test execution type LoadTest struct { - config LoadTestConfig - metrics *LoadTestMetrics - ctx context.Context - cancel context.CancelFunc + config LoadTestConfig + metrics *LoadTestMetrics + ctx context.Context + cancel context.CancelFunc + reporterCtx context.Context + reporterCancel context.CancelFunc } // NewLoadTest creates a new load test instance func NewLoadTest(config LoadTestConfig) *LoadTest { ctx, cancel := context.WithCancel(context.Background()) + reporterCtx, reporterCancel := context.WithCancel(context.Background()) return &LoadTest{ - config: config, - metrics: &LoadTestMetrics{}, - ctx: ctx, - cancel: cancel, + config: config, + metrics: &LoadTestMetrics{}, + ctx: ctx, + cancel: cancel, + reporterCtx: reporterCtx, + reporterCancel: reporterCancel, } } @@ -94,6 +100,12 @@ func (lt *LoadTest) Run() error { var wg sync.WaitGroup pairChan := make(chan int, channelBufferSize) + // Start progress reporter if configured + if lt.config.ReportInterval > 0 { + wg.Add(1) + go lt.progressReporter(&wg, lt.config.ReportInterval) + } + for i := 0; i < workerPoolSize; i++ { wg.Add(1) go lt.pairWorker(&wg, pairChan) @@ -129,6 +141,10 @@ func (lt *LoadTest) Run() error { log.Infof("All %d pairs queued, waiting for completion...", pairsCreated) close(pairChan) + + // Cancel progress reporter context to stop it before waiting + lt.reporterCancel() + wg.Wait() return nil @@ -320,9 +336,43 @@ func (lt *LoadTest) recordLatency(latency time.Duration) { lt.metrics.latencies = append(lt.metrics.latencies, latency) } +// progressReporter prints periodic progress reports +func (lt *LoadTest) progressReporter(wg *sync.WaitGroup, interval int) { + defer wg.Done() + + lastReported := int64(0) + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-lt.reporterCtx.Done(): + return + case <-ticker.C: + currentMessages := lt.metrics.TotalMessagesExchanged.Load() + if currentMessages-lastReported >= int64(interval) { + elapsed := time.Since(lt.metrics.startTime) + pairs := lt.metrics.SuccessfulExchanges.Load() + errors := lt.metrics.TotalErrors.Load() + + var msgRate float64 + if elapsed.Seconds() > 0 { + msgRate = float64(currentMessages) / elapsed.Seconds() + } + + log.Infof("Progress: %d messages exchanged, %d pairs completed, %d errors, %.2f msg/sec, elapsed: %v", + currentMessages, pairs, errors, msgRate, elapsed.Round(time.Second)) + + lastReported = (currentMessages / int64(interval)) * int64(interval) + } + } + } +} + // Stop stops the load test func (lt *LoadTest) Stop() { lt.cancel() + lt.reporterCancel() } // GetMetrics returns the collected metrics diff --git a/signal/loadtest/rate_loadtest_test.go b/signal/loadtest/rate_loadtest_test.go index 3f89f6060..c2e1a13ad 100644 --- a/signal/loadtest/rate_loadtest_test.go +++ b/signal/loadtest/rate_loadtest_test.go @@ -182,6 +182,7 @@ func TestLoadTest_ContinuousExchange_ShortBurst(t *testing.T) { ExchangeDuration: 3 * time.Second, MessageInterval: 100 * time.Millisecond, TestDuration: 10 * time.Second, + ReportInterval: 50, // Report every 50 messages for testing } loadTest := NewLoadTest(config)