diff --git a/signal/loadtest/rate_loadtest.go b/signal/loadtest/rate_loadtest.go index 2c05f2efb..1dd140136 100644 --- a/signal/loadtest/rate_loadtest.go +++ b/signal/loadtest/rate_loadtest.go @@ -12,6 +12,7 @@ import ( // LoadTestConfig configuration for the load test type LoadTestConfig struct { + IDPrefix string ServerURL string PairsPerSecond int TotalPairs int @@ -33,6 +34,7 @@ type LoadTestMetrics struct { TotalErrors atomic.Int64 SuccessfulExchanges atomic.Int64 FailedExchanges atomic.Int64 + ActivePairs atomic.Int64 mu sync.Mutex latencies []time.Duration @@ -61,6 +63,7 @@ type LoadTest struct { func NewLoadTest(config LoadTestConfig) *LoadTest { ctx, cancel := context.WithCancel(context.Background()) reporterCtx, reporterCancel := context.WithCancel(context.Background()) + config.IDPrefix = fmt.Sprintf("%d-", time.Now().UnixNano()) return &LoadTest{ config: config, metrics: &LoadTestMetrics{}, @@ -98,12 +101,13 @@ func (lt *LoadTest) Run() error { log.Infof("Worker pool size: %d, channel buffer: %d", workerPoolSize, channelBufferSize) var wg sync.WaitGroup + var reporterWg sync.WaitGroup pairChan := make(chan int, channelBufferSize) // Start progress reporter if configured if lt.config.ReportInterval > 0 { - wg.Add(1) - go lt.progressReporter(&wg, lt.config.ReportInterval) + reporterWg.Add(1) + go lt.progressReporter(&reporterWg, lt.config.ReportInterval) } for i := 0; i < workerPoolSize; i++ { @@ -141,12 +145,12 @@ func (lt *LoadTest) Run() error { log.Infof("All %d pairs queued, waiting for completion...", pairsCreated) close(pairChan) - - // Cancel progress reporter context to stop it before waiting - lt.reporterCancel() - wg.Wait() + // Cancel progress reporter context after all work is done and wait for it + lt.reporterCancel() + reporterWg.Wait() + return nil } @@ -154,6 +158,7 @@ func (lt *LoadTest) pairWorker(wg *sync.WaitGroup, pairChan <-chan int) { defer wg.Done() for pairID := range pairChan { + lt.metrics.ActivePairs.Add(1) if err := lt.executePairExchange(pairID); err != nil { lt.metrics.TotalErrors.Add(1) lt.metrics.FailedExchanges.Add(1) @@ -161,13 +166,14 @@ func (lt *LoadTest) pairWorker(wg *sync.WaitGroup, pairChan <-chan int) { } else { lt.metrics.SuccessfulExchanges.Add(1) } + lt.metrics.ActivePairs.Add(-1) lt.metrics.TotalPairsSent.Add(1) } } func (lt *LoadTest) executePairExchange(pairID int) error { - senderID := fmt.Sprintf("sender-%d", pairID) - receiverID := fmt.Sprintf("receiver-%d", pairID) + senderID := fmt.Sprintf("%s-%d", lt.config.IDPrefix, pairID) + receiverID := fmt.Sprintf("%s-%d", lt.config.IDPrefix, pairID) clientConfig := &ClientConfig{ InsecureSkipVerify: lt.config.InsecureSkipVerify, @@ -352,7 +358,7 @@ func (lt *LoadTest) progressReporter(wg *sync.WaitGroup, interval int) { currentMessages := lt.metrics.TotalMessagesExchanged.Load() if currentMessages-lastReported >= int64(interval) { elapsed := time.Since(lt.metrics.startTime) - pairs := lt.metrics.SuccessfulExchanges.Load() + activePairs := lt.metrics.ActivePairs.Load() errors := lt.metrics.TotalErrors.Load() var msgRate float64 @@ -360,8 +366,8 @@ func (lt *LoadTest) progressReporter(wg *sync.WaitGroup, interval int) { msgRate = float64(currentMessages) / elapsed.Seconds() } - log.Infof("Progress: %d messages exchanged, %d pairs completed, %d errors, %.2f msg/sec, elapsed: %v", - currentMessages, pairs, errors, msgRate, elapsed.Round(time.Second)) + log.Infof("Progress: %d messages exchanged, %d active pairs, %d errors, %.2f msg/sec, elapsed: %v", + currentMessages, activePairs, errors, msgRate, elapsed.Round(time.Second)) lastReported = (currentMessages / int64(interval)) * int64(interval) }