From d1153b5b5d83ce92821a29f39b2f4916cc42557f Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Thu, 9 Oct 2025 23:09:55 +0200 Subject: [PATCH] add rate load test --- signal/loadtest/README.md | 143 ++++++++++++++ signal/loadtest/rate_loadtest.go | 257 ++++++++++++++++++++++++++ signal/loadtest/rate_loadtest_test.go | 174 +++++++++++++++++ 3 files changed, 574 insertions(+) create mode 100644 signal/loadtest/README.md create mode 100644 signal/loadtest/rate_loadtest.go create mode 100644 signal/loadtest/rate_loadtest_test.go diff --git a/signal/loadtest/README.md b/signal/loadtest/README.md new file mode 100644 index 000000000..bdd102e91 --- /dev/null +++ b/signal/loadtest/README.md @@ -0,0 +1,143 @@ +# Signal Server Load Test + +Load testing tool for the NetBird signal server. + +## Features + +- **Rate-based peer pair creation**: Spawn peer pairs at configurable rates (e.g., 10, 20 pairs/sec) +- **Message exchange validation**: Each pair exchanges one message and validates encrypted body size > 0 +- **Comprehensive metrics**: Tracks throughput, success/failure rates, and latency statistics +- **Local server testing**: Tests include embedded signal server for easy development + +## Usage + +### Running Tests + +```bash +# Run all tests (includes load tests) +go test -v -timeout 60s + +# Run specific load test +go test -v -run TestLoadTest_10PairsPerSecond -timeout 40s +go test -v -run TestLoadTest_20PairsPerSecond -timeout 40s +go test -v -run TestLoadTest_SmallBurst -timeout 30s + +# Skip load tests in quick runs +go test -short +``` + +### Programmatic Usage + +```go +package main + +import ( + "github.com/netbirdio/netbird/signal/loadtest" + "time" +) + +func main() { + config := loadtest.LoadTestConfig{ + ServerURL: "http://localhost:10000", + PairsPerSecond: 10, + TotalPairs: 100, + MessageSize: 100, + TestDuration: 30 * time.Second, + } + + lt := loadtest.NewLoadTest(config) + if err := lt.Run(); err != nil { + panic(err) + } + + metrics := lt.GetMetrics() + metrics.PrintReport() +} +``` + +## Configuration Options + +- **ServerURL**: Signal server URL (e.g., `http://localhost:10000` or `https://signal.example.com:443`) +- **PairsPerSecond**: Rate at which peer pairs are created (e.g., 10, 20) +- **TotalPairs**: Total number of peer pairs to create +- **MessageSize**: Size of test message payload in bytes +- **TestDuration**: Maximum test duration (optional, 0 = no limit) +- **RampUpDuration**: Gradual ramp-up period (not yet implemented) + +## Metrics + +The load test collects and reports: + +- **Total Pairs Sent**: Number of peer pairs attempted +- **Successful Exchanges**: Completed message exchanges +- **Failed Exchanges**: Failed message exchanges +- **Total Messages Exchanged**: Count of successfully exchanged messages +- **Total Errors**: Cumulative error count +- **Throughput**: Pairs per second (actual) +- **Latency Statistics**: Min, Max, Avg message exchange latency + +## Test Results + +Example output from a 20 pairs/sec test: + +``` +=== Load Test Report === +Test Duration: 5.055249917s +Total Pairs Sent: 100 +Successful Exchanges: 100 +Failed Exchanges: 0 +Total Messages Exchanged: 100 +Total Errors: 0 +Throughput: 19.78 pairs/sec + +Latency Statistics: + Min: 170.375µs + Max: 5.176916ms + Avg: 441.566µs +======================== +``` + +## Architecture + +### Client (`client.go`) +- Manages gRPC connection to signal server +- Establishes bidirectional stream for receiving messages +- Sends messages via `Send` RPC method +- Handles message reception asynchronously + +### Load Test Engine (`rate_loadtest.go`) +- Worker pool pattern for concurrent peer pairs +- Rate-limited pair creation using ticker +- Atomic counters for thread-safe metrics collection +- Graceful shutdown on context cancellation + +### Test Suite +- `loadtest_test.go`: Single pair validation test +- `rate_loadtest_test.go`: Multiple rate-based load tests and benchmarks + +## Implementation Details + +### Message Flow +1. Create sender and receiver clients with unique IDs +2. Both clients connect to signal server via bidirectional stream +3. Sender sends encrypted message using `Send` RPC +4. Signal server forwards message to receiver's stream +5. Receiver reads message from stream +6. Validate encrypted body size > 0 +7. Record latency and success metrics + +### Concurrency +- Worker pool size = `PairsPerSecond` +- Each worker handles multiple peer pairs sequentially +- Atomic operations for metrics to avoid lock contention +- Channel-based work distribution + +## Future Enhancements + +- [ ] TLS/HTTPS support for production servers +- [ ] Ramp-up period implementation +- [ ] Percentile latency metrics (p50, p95, p99) +- [ ] Connection reuse for multiple messages per pair +- [ ] Support for custom message payloads +- [ ] CSV/JSON metrics export +- [ ] Real-time metrics dashboard diff --git a/signal/loadtest/rate_loadtest.go b/signal/loadtest/rate_loadtest.go new file mode 100644 index 000000000..eed6dff1b --- /dev/null +++ b/signal/loadtest/rate_loadtest.go @@ -0,0 +1,257 @@ +package loadtest + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + log "github.com/sirupsen/logrus" +) + +// LoadTestConfig configuration for the load test +type LoadTestConfig struct { + ServerURL string + PairsPerSecond int + TotalPairs int + MessageSize int + TestDuration time.Duration + RampUpDuration time.Duration +} + +// LoadTestMetrics metrics collected during the load test +type LoadTestMetrics struct { + TotalPairsSent atomic.Int64 + TotalMessagesExchanged atomic.Int64 + TotalErrors atomic.Int64 + SuccessfulExchanges atomic.Int64 + FailedExchanges atomic.Int64 + + mu sync.Mutex + latencies []time.Duration + startTime time.Time + endTime time.Time +} + +// PeerPair represents a sender-receiver pair +type PeerPair struct { + sender *Client + receiver *Client + pairID int +} + +// LoadTest manages the load test execution +type LoadTest struct { + config LoadTestConfig + metrics *LoadTestMetrics + ctx context.Context + cancel context.CancelFunc +} + +// NewLoadTest creates a new load test instance +func NewLoadTest(config LoadTestConfig) *LoadTest { + ctx, cancel := context.WithCancel(context.Background()) + return &LoadTest{ + config: config, + metrics: &LoadTestMetrics{}, + ctx: ctx, + cancel: cancel, + } +} + +// Run executes the load test +func (lt *LoadTest) Run() error { + lt.metrics.startTime = time.Now() + defer func() { + lt.metrics.endTime = time.Now() + }() + + log.Infof("Starting load test: %d pairs/sec, %d total pairs, message size: %d bytes", + lt.config.PairsPerSecond, lt.config.TotalPairs, lt.config.MessageSize) + + var wg sync.WaitGroup + pairChan := make(chan int, lt.config.PairsPerSecond) + + for i := 0; i < lt.config.PairsPerSecond; i++ { + wg.Add(1) + go lt.pairWorker(&wg, pairChan) + } + + testCtx := lt.ctx + if lt.config.TestDuration > 0 { + var testCancel context.CancelFunc + testCtx, testCancel = context.WithTimeout(lt.ctx, lt.config.TestDuration) + defer testCancel() + } + + ticker := time.NewTicker(time.Second / time.Duration(lt.config.PairsPerSecond)) + defer ticker.Stop() + + pairsCreated := 0 + for pairsCreated < lt.config.TotalPairs { + select { + case <-testCtx.Done(): + log.Infof("Test duration reached or context cancelled") + close(pairChan) + wg.Wait() + return testCtx.Err() + case <-ticker.C: + select { + case pairChan <- pairsCreated: + pairsCreated++ + default: + log.Warnf("Worker pool saturated, skipping pair creation") + } + } + } + + log.Infof("All %d pairs queued, waiting for completion...", pairsCreated) + close(pairChan) + wg.Wait() + + return nil +} + +func (lt *LoadTest) pairWorker(wg *sync.WaitGroup, pairChan <-chan int) { + defer wg.Done() + + for pairID := range pairChan { + if err := lt.executePairExchange(pairID); err != nil { + lt.metrics.TotalErrors.Add(1) + lt.metrics.FailedExchanges.Add(1) + log.Debugf("Pair %d exchange failed: %v", pairID, err) + } else { + lt.metrics.SuccessfulExchanges.Add(1) + } + lt.metrics.TotalPairsSent.Add(1) + } +} + +func (lt *LoadTest) executePairExchange(pairID int) error { + senderID := fmt.Sprintf("sender-%d", pairID) + receiverID := fmt.Sprintf("receiver-%d", pairID) + + sender, err := NewClient(lt.config.ServerURL, senderID) + if err != nil { + return fmt.Errorf("create sender: %w", err) + } + defer sender.Close() + + receiver, err := NewClient(lt.config.ServerURL, receiverID) + if err != nil { + return fmt.Errorf("create receiver: %w", err) + } + defer receiver.Close() + + if err := sender.Connect(); err != nil { + return fmt.Errorf("sender connect: %w", err) + } + + if err := receiver.Connect(); err != nil { + return fmt.Errorf("receiver connect: %w", err) + } + + time.Sleep(50 * time.Millisecond) + + testMessage := make([]byte, lt.config.MessageSize) + for i := range testMessage { + testMessage[i] = byte(i % 256) + } + + startTime := time.Now() + + if err := sender.SendMessage(receiverID, testMessage); err != nil { + return fmt.Errorf("send message: %w", err) + } + + receiveDone := make(chan error, 1) + go func() { + msg, err := receiver.ReceiveMessage() + if err != nil { + receiveDone <- err + return + } + if len(msg.Body) == 0 { + receiveDone <- fmt.Errorf("empty message body") + return + } + receiveDone <- nil + }() + + select { + case err := <-receiveDone: + if err != nil { + return fmt.Errorf("receive message: %w", err) + } + latency := time.Since(startTime) + lt.recordLatency(latency) + lt.metrics.TotalMessagesExchanged.Add(1) + return nil + case <-time.After(5 * time.Second): + return fmt.Errorf("timeout waiting for message") + case <-lt.ctx.Done(): + return lt.ctx.Err() + } +} + +func (lt *LoadTest) recordLatency(latency time.Duration) { + lt.metrics.mu.Lock() + defer lt.metrics.mu.Unlock() + lt.metrics.latencies = append(lt.metrics.latencies, latency) +} + +// Stop stops the load test +func (lt *LoadTest) Stop() { + lt.cancel() +} + +// GetMetrics returns the collected metrics +func (lt *LoadTest) GetMetrics() *LoadTestMetrics { + return lt.metrics +} + +// PrintReport prints a summary report of the test results +func (m *LoadTestMetrics) PrintReport() { + duration := m.endTime.Sub(m.startTime) + + fmt.Println("\n=== Load Test Report ===") + fmt.Printf("Test Duration: %v\n", duration) + fmt.Printf("Total Pairs Sent: %d\n", m.TotalPairsSent.Load()) + fmt.Printf("Successful Exchanges: %d\n", m.SuccessfulExchanges.Load()) + fmt.Printf("Failed Exchanges: %d\n", m.FailedExchanges.Load()) + fmt.Printf("Total Messages Exchanged: %d\n", m.TotalMessagesExchanged.Load()) + fmt.Printf("Total Errors: %d\n", m.TotalErrors.Load()) + + if duration.Seconds() > 0 { + throughput := float64(m.SuccessfulExchanges.Load()) / duration.Seconds() + fmt.Printf("Throughput: %.2f pairs/sec\n", throughput) + } + + m.mu.Lock() + latencies := m.latencies + m.mu.Unlock() + + if len(latencies) > 0 { + var total time.Duration + min := latencies[0] + max := latencies[0] + + for _, lat := range latencies { + total += lat + if lat < min { + min = lat + } + if lat > max { + max = lat + } + } + + avg := total / time.Duration(len(latencies)) + fmt.Printf("\nLatency Statistics:\n") + fmt.Printf(" Min: %v\n", min) + fmt.Printf(" Max: %v\n", max) + fmt.Printf(" Avg: %v\n", avg) + } + fmt.Println("========================") +} diff --git a/signal/loadtest/rate_loadtest_test.go b/signal/loadtest/rate_loadtest_test.go new file mode 100644 index 000000000..edacafb41 --- /dev/null +++ b/signal/loadtest/rate_loadtest_test.go @@ -0,0 +1,174 @@ +package loadtest + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "google.golang.org/grpc" + + "github.com/netbirdio/netbird/shared/signal/proto" + "github.com/netbirdio/netbird/signal/server" +) + +func TestLoadTest_10PairsPerSecond(t *testing.T) { + if testing.Short() { + t.Skip("Skipping load test in short mode") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx) + defer grpcServer.Stop() + + config := LoadTestConfig{ + ServerURL: serverAddr, + PairsPerSecond: 10, + TotalPairs: 50, + MessageSize: 100, + TestDuration: 30 * time.Second, + } + + loadTest := NewLoadTest(config) + err := loadTest.Run() + require.NoError(t, err) + + metrics := loadTest.GetMetrics() + metrics.PrintReport() + + require.Equal(t, int64(50), metrics.TotalPairsSent.Load(), "Should send all 50 pairs") + require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(0), "Should have successful exchanges") + require.Equal(t, metrics.TotalMessagesExchanged.Load(), metrics.SuccessfulExchanges.Load(), "Messages exchanged should match successful exchanges") +} + +func TestLoadTest_20PairsPerSecond(t *testing.T) { + if testing.Short() { + t.Skip("Skipping load test in short mode") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx) + defer grpcServer.Stop() + + config := LoadTestConfig{ + ServerURL: serverAddr, + PairsPerSecond: 20, + TotalPairs: 100, + MessageSize: 500, + TestDuration: 30 * time.Second, + } + + loadTest := NewLoadTest(config) + err := loadTest.Run() + require.NoError(t, err) + + metrics := loadTest.GetMetrics() + metrics.PrintReport() + + require.Equal(t, int64(100), metrics.TotalPairsSent.Load(), "Should send all 100 pairs") + require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(0), "Should have successful exchanges") +} + +func TestLoadTest_SmallBurst(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx) + defer grpcServer.Stop() + + config := LoadTestConfig{ + ServerURL: serverAddr, + PairsPerSecond: 5, + TotalPairs: 10, + MessageSize: 50, + TestDuration: 10 * time.Second, + } + + loadTest := NewLoadTest(config) + err := loadTest.Run() + require.NoError(t, err) + + metrics := loadTest.GetMetrics() + metrics.PrintReport() + + require.Equal(t, int64(10), metrics.TotalPairsSent.Load()) + require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(5), "At least 50% success rate") + require.Less(t, metrics.FailedExchanges.Load(), int64(5), "Less than 50% failure rate") +} + +func BenchmarkLoadTest_Throughput(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + grpcServer, serverAddr := startBenchSignalServer(b, ctx) + defer grpcServer.Stop() + + b.Run("5pairs-per-sec", func(b *testing.B) { + config := LoadTestConfig{ + ServerURL: serverAddr, + PairsPerSecond: 5, + TotalPairs: b.N, + MessageSize: 100, + } + + loadTest := NewLoadTest(config) + b.ResetTimer() + _ = loadTest.Run() + b.StopTimer() + + metrics := loadTest.GetMetrics() + b.ReportMetric(float64(metrics.SuccessfulExchanges.Load()), "successful") + b.ReportMetric(float64(metrics.FailedExchanges.Load()), "failed") + }) +} + +func startTestSignalServerForLoad(t *testing.T, ctx context.Context) (*grpc.Server, string) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + grpcServer := grpc.NewServer() + + signalServer, err := server.NewServer(ctx, otel.Meter("test")) + require.NoError(t, err) + + proto.RegisterSignalExchangeServer(grpcServer, signalServer) + + go func() { + if err := grpcServer.Serve(listener); err != nil { + t.Logf("Server stopped: %v", err) + } + }() + + time.Sleep(100 * time.Millisecond) + + return grpcServer, fmt.Sprintf("http://%s", listener.Addr().String()) +} + +func startBenchSignalServer(b *testing.B, ctx context.Context) (*grpc.Server, string) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(b, err) + + grpcServer := grpc.NewServer() + + signalServer, err := server.NewServer(ctx, otel.Meter("bench")) + require.NoError(b, err) + + proto.RegisterSignalExchangeServer(grpcServer, signalServer) + + go func() { + if err := grpcServer.Serve(listener); err != nil { + b.Logf("Server stopped: %v", err) + } + }() + + time.Sleep(100 * time.Millisecond) + + return grpcServer, fmt.Sprintf("http://%s", listener.Addr().String()) +}