mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-16 15:26:40 +00:00
add report on 10k messages
This commit is contained in:
@@ -79,6 +79,15 @@ go build -o signal-loadtest
|
||||
-exchange-duration 60s \
|
||||
-log-level info
|
||||
|
||||
# Progress reporting - report every 5000 messages
|
||||
./signal-loadtest \
|
||||
-server http://localhost:10000 \
|
||||
-pairs-per-sec 50 \
|
||||
-total-pairs 100 \
|
||||
-exchange-duration 5m \
|
||||
-report-interval 5000 \
|
||||
-log-level info
|
||||
|
||||
# Show help
|
||||
./signal-loadtest -h
|
||||
```
|
||||
@@ -93,6 +102,7 @@ go build -o signal-loadtest
|
||||
- `-message-interval`: Interval between messages in continuous mode (default: 100ms)
|
||||
- `-worker-pool-size`: Number of concurrent workers, 0 = auto (pairs-per-sec × 2) (default: 0)
|
||||
- `-channel-buffer-size`: Work queue buffer size, 0 = auto (pairs-per-sec × 4) (default: 0)
|
||||
- `-report-interval`: Report progress every N messages, 0 = no periodic reports (default: 10000)
|
||||
- `-insecure-skip-verify`: Skip TLS certificate verification for self-signed certificates (default: false)
|
||||
- `-log-level`: Log level: trace, debug, info, warn, error (default: info)
|
||||
|
||||
@@ -187,6 +197,7 @@ func main() {
|
||||
- **MessageInterval**: Interval between messages in continuous mode (default: 100ms)
|
||||
- **WorkerPoolSize**: Number of concurrent worker goroutines (0 = auto: pairs-per-sec × 2)
|
||||
- **ChannelBufferSize**: Work queue buffer size (0 = auto: pairs-per-sec × 4)
|
||||
- **ReportInterval**: Report progress every N messages (0 = no periodic reports, default: 10000)
|
||||
- **InsecureSkipVerify**: Skip TLS certificate verification (for self-signed certificates)
|
||||
- **RampUpDuration**: Gradual ramp-up period (not yet implemented)
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ var (
|
||||
insecureSkipVerify bool
|
||||
workerPoolSize int
|
||||
channelBufferSize int
|
||||
reportInterval int
|
||||
logLevel string
|
||||
)
|
||||
|
||||
@@ -36,6 +37,7 @@ func init() {
|
||||
flag.BoolVar(&insecureSkipVerify, "insecure-skip-verify", false, "Skip TLS certificate verification (use with self-signed certificates)")
|
||||
flag.IntVar(&workerPoolSize, "worker-pool-size", 0, "Number of worker goroutines (0 = auto: pairs-per-sec * 2)")
|
||||
flag.IntVar(&channelBufferSize, "channel-buffer-size", 0, "Channel buffer size (0 = auto: pairs-per-sec * 4)")
|
||||
flag.IntVar(&reportInterval, "report-interval", 10000, "Report progress every N messages (0 = no periodic reports)")
|
||||
flag.StringVar(&logLevel, "log-level", "info", "Log level (trace, debug, info, warn, error)")
|
||||
}
|
||||
|
||||
@@ -60,6 +62,7 @@ func main() {
|
||||
InsecureSkipVerify: insecureSkipVerify,
|
||||
WorkerPoolSize: workerPoolSize,
|
||||
ChannelBufferSize: channelBufferSize,
|
||||
ReportInterval: reportInterval,
|
||||
}
|
||||
|
||||
if err := validateConfig(config); err != nil {
|
||||
|
||||
@@ -24,5 +24,17 @@ echo "=== Test 2: Continuous exchange (3 pairs, 5 seconds) ==="
|
||||
-message-interval 200ms \
|
||||
-log-level info
|
||||
|
||||
echo ""
|
||||
echo "=== Test 3: Progress reporting (10 pairs, 10s, report every 100 messages) ==="
|
||||
./signal-loadtest \
|
||||
-server http://localhost:10000 \
|
||||
-pairs-per-sec 10 \
|
||||
-total-pairs 10 \
|
||||
-message-size 100 \
|
||||
-exchange-duration 10s \
|
||||
-message-interval 100ms \
|
||||
-report-interval 100 \
|
||||
-log-level info
|
||||
|
||||
echo ""
|
||||
echo "All tests completed!"
|
||||
|
||||
@@ -23,6 +23,7 @@ type LoadTestConfig struct {
|
||||
InsecureSkipVerify bool
|
||||
WorkerPoolSize int
|
||||
ChannelBufferSize int
|
||||
ReportInterval int // Report progress every N messages (0 = no periodic reports)
|
||||
}
|
||||
|
||||
// LoadTestMetrics metrics collected during the load test
|
||||
@@ -48,20 +49,25 @@ type PeerPair struct {
|
||||
|
||||
// LoadTest manages the load test execution
|
||||
type LoadTest struct {
|
||||
config LoadTestConfig
|
||||
metrics *LoadTestMetrics
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
config LoadTestConfig
|
||||
metrics *LoadTestMetrics
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
reporterCtx context.Context
|
||||
reporterCancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewLoadTest creates a new load test instance
|
||||
func NewLoadTest(config LoadTestConfig) *LoadTest {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
reporterCtx, reporterCancel := context.WithCancel(context.Background())
|
||||
return &LoadTest{
|
||||
config: config,
|
||||
metrics: &LoadTestMetrics{},
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
config: config,
|
||||
metrics: &LoadTestMetrics{},
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
reporterCtx: reporterCtx,
|
||||
reporterCancel: reporterCancel,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,6 +100,12 @@ func (lt *LoadTest) Run() error {
|
||||
var wg sync.WaitGroup
|
||||
pairChan := make(chan int, channelBufferSize)
|
||||
|
||||
// Start progress reporter if configured
|
||||
if lt.config.ReportInterval > 0 {
|
||||
wg.Add(1)
|
||||
go lt.progressReporter(&wg, lt.config.ReportInterval)
|
||||
}
|
||||
|
||||
for i := 0; i < workerPoolSize; i++ {
|
||||
wg.Add(1)
|
||||
go lt.pairWorker(&wg, pairChan)
|
||||
@@ -129,6 +141,10 @@ func (lt *LoadTest) Run() error {
|
||||
|
||||
log.Infof("All %d pairs queued, waiting for completion...", pairsCreated)
|
||||
close(pairChan)
|
||||
|
||||
// Cancel progress reporter context to stop it before waiting
|
||||
lt.reporterCancel()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
@@ -320,9 +336,43 @@ func (lt *LoadTest) recordLatency(latency time.Duration) {
|
||||
lt.metrics.latencies = append(lt.metrics.latencies, latency)
|
||||
}
|
||||
|
||||
// progressReporter prints periodic progress reports
|
||||
func (lt *LoadTest) progressReporter(wg *sync.WaitGroup, interval int) {
|
||||
defer wg.Done()
|
||||
|
||||
lastReported := int64(0)
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-lt.reporterCtx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
currentMessages := lt.metrics.TotalMessagesExchanged.Load()
|
||||
if currentMessages-lastReported >= int64(interval) {
|
||||
elapsed := time.Since(lt.metrics.startTime)
|
||||
pairs := lt.metrics.SuccessfulExchanges.Load()
|
||||
errors := lt.metrics.TotalErrors.Load()
|
||||
|
||||
var msgRate float64
|
||||
if elapsed.Seconds() > 0 {
|
||||
msgRate = float64(currentMessages) / elapsed.Seconds()
|
||||
}
|
||||
|
||||
log.Infof("Progress: %d messages exchanged, %d pairs completed, %d errors, %.2f msg/sec, elapsed: %v",
|
||||
currentMessages, pairs, errors, msgRate, elapsed.Round(time.Second))
|
||||
|
||||
lastReported = (currentMessages / int64(interval)) * int64(interval)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the load test
|
||||
func (lt *LoadTest) Stop() {
|
||||
lt.cancel()
|
||||
lt.reporterCancel()
|
||||
}
|
||||
|
||||
// GetMetrics returns the collected metrics
|
||||
|
||||
@@ -182,6 +182,7 @@ func TestLoadTest_ContinuousExchange_ShortBurst(t *testing.T) {
|
||||
ExchangeDuration: 3 * time.Second,
|
||||
MessageInterval: 100 * time.Millisecond,
|
||||
TestDuration: 10 * time.Second,
|
||||
ReportInterval: 50, // Report every 50 messages for testing
|
||||
}
|
||||
|
||||
loadTest := NewLoadTest(config)
|
||||
|
||||
Reference in New Issue
Block a user