mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-16 15:26:40 +00:00
add rate load test
This commit is contained in:
143
signal/loadtest/README.md
Normal file
143
signal/loadtest/README.md
Normal file
@@ -0,0 +1,143 @@
|
||||
# Signal Server Load Test
|
||||
|
||||
Load testing tool for the NetBird signal server.
|
||||
|
||||
## Features
|
||||
|
||||
- **Rate-based peer pair creation**: Spawn peer pairs at configurable rates (e.g., 10, 20 pairs/sec)
|
||||
- **Message exchange validation**: Each pair exchanges one message and validates encrypted body size > 0
|
||||
- **Comprehensive metrics**: Tracks throughput, success/failure rates, and latency statistics
|
||||
- **Local server testing**: Tests include embedded signal server for easy development
|
||||
|
||||
## Usage
|
||||
|
||||
### Running Tests
|
||||
|
||||
```bash
|
||||
# Run all tests (includes load tests)
|
||||
go test -v -timeout 60s
|
||||
|
||||
# Run specific load test
|
||||
go test -v -run TestLoadTest_10PairsPerSecond -timeout 40s
|
||||
go test -v -run TestLoadTest_20PairsPerSecond -timeout 40s
|
||||
go test -v -run TestLoadTest_SmallBurst -timeout 30s
|
||||
|
||||
# Skip load tests in quick runs
|
||||
go test -short
|
||||
```
|
||||
|
||||
### Programmatic Usage
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/netbirdio/netbird/signal/loadtest"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
config := loadtest.LoadTestConfig{
|
||||
ServerURL: "http://localhost:10000",
|
||||
PairsPerSecond: 10,
|
||||
TotalPairs: 100,
|
||||
MessageSize: 100,
|
||||
TestDuration: 30 * time.Second,
|
||||
}
|
||||
|
||||
lt := loadtest.NewLoadTest(config)
|
||||
if err := lt.Run(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
metrics := lt.GetMetrics()
|
||||
metrics.PrintReport()
|
||||
}
|
||||
```
|
||||
|
||||
## Configuration Options
|
||||
|
||||
- **ServerURL**: Signal server URL (e.g., `http://localhost:10000` or `https://signal.example.com:443`)
|
||||
- **PairsPerSecond**: Rate at which peer pairs are created (e.g., 10, 20)
|
||||
- **TotalPairs**: Total number of peer pairs to create
|
||||
- **MessageSize**: Size of test message payload in bytes
|
||||
- **TestDuration**: Maximum test duration (optional, 0 = no limit)
|
||||
- **RampUpDuration**: Gradual ramp-up period (not yet implemented)
|
||||
|
||||
## Metrics
|
||||
|
||||
The load test collects and reports:
|
||||
|
||||
- **Total Pairs Sent**: Number of peer pairs attempted
|
||||
- **Successful Exchanges**: Completed message exchanges
|
||||
- **Failed Exchanges**: Failed message exchanges
|
||||
- **Total Messages Exchanged**: Count of successfully exchanged messages
|
||||
- **Total Errors**: Cumulative error count
|
||||
- **Throughput**: Pairs per second (actual)
|
||||
- **Latency Statistics**: Min, Max, Avg message exchange latency
|
||||
|
||||
## Test Results
|
||||
|
||||
Example output from a 20 pairs/sec test:
|
||||
|
||||
```
|
||||
=== Load Test Report ===
|
||||
Test Duration: 5.055249917s
|
||||
Total Pairs Sent: 100
|
||||
Successful Exchanges: 100
|
||||
Failed Exchanges: 0
|
||||
Total Messages Exchanged: 100
|
||||
Total Errors: 0
|
||||
Throughput: 19.78 pairs/sec
|
||||
|
||||
Latency Statistics:
|
||||
Min: 170.375µs
|
||||
Max: 5.176916ms
|
||||
Avg: 441.566µs
|
||||
========================
|
||||
```
|
||||
|
||||
## Architecture
|
||||
|
||||
### Client (`client.go`)
|
||||
- Manages gRPC connection to signal server
|
||||
- Establishes bidirectional stream for receiving messages
|
||||
- Sends messages via `Send` RPC method
|
||||
- Handles message reception asynchronously
|
||||
|
||||
### Load Test Engine (`rate_loadtest.go`)
|
||||
- Worker pool pattern for concurrent peer pairs
|
||||
- Rate-limited pair creation using ticker
|
||||
- Atomic counters for thread-safe metrics collection
|
||||
- Graceful shutdown on context cancellation
|
||||
|
||||
### Test Suite
|
||||
- `loadtest_test.go`: Single pair validation test
|
||||
- `rate_loadtest_test.go`: Multiple rate-based load tests and benchmarks
|
||||
|
||||
## Implementation Details
|
||||
|
||||
### Message Flow
|
||||
1. Create sender and receiver clients with unique IDs
|
||||
2. Both clients connect to signal server via bidirectional stream
|
||||
3. Sender sends encrypted message using `Send` RPC
|
||||
4. Signal server forwards message to receiver's stream
|
||||
5. Receiver reads message from stream
|
||||
6. Validate encrypted body size > 0
|
||||
7. Record latency and success metrics
|
||||
|
||||
### Concurrency
|
||||
- Worker pool size = `PairsPerSecond`
|
||||
- Each worker handles multiple peer pairs sequentially
|
||||
- Atomic operations for metrics to avoid lock contention
|
||||
- Channel-based work distribution
|
||||
|
||||
## Future Enhancements
|
||||
|
||||
- [ ] TLS/HTTPS support for production servers
|
||||
- [ ] Ramp-up period implementation
|
||||
- [ ] Percentile latency metrics (p50, p95, p99)
|
||||
- [ ] Connection reuse for multiple messages per pair
|
||||
- [ ] Support for custom message payloads
|
||||
- [ ] CSV/JSON metrics export
|
||||
- [ ] Real-time metrics dashboard
|
||||
257
signal/loadtest/rate_loadtest.go
Normal file
257
signal/loadtest/rate_loadtest.go
Normal file
@@ -0,0 +1,257 @@
|
||||
package loadtest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// LoadTestConfig configuration for the load test
|
||||
type LoadTestConfig struct {
|
||||
ServerURL string
|
||||
PairsPerSecond int
|
||||
TotalPairs int
|
||||
MessageSize int
|
||||
TestDuration time.Duration
|
||||
RampUpDuration time.Duration
|
||||
}
|
||||
|
||||
// LoadTestMetrics metrics collected during the load test
|
||||
type LoadTestMetrics struct {
|
||||
TotalPairsSent atomic.Int64
|
||||
TotalMessagesExchanged atomic.Int64
|
||||
TotalErrors atomic.Int64
|
||||
SuccessfulExchanges atomic.Int64
|
||||
FailedExchanges atomic.Int64
|
||||
|
||||
mu sync.Mutex
|
||||
latencies []time.Duration
|
||||
startTime time.Time
|
||||
endTime time.Time
|
||||
}
|
||||
|
||||
// PeerPair represents a sender-receiver pair
|
||||
type PeerPair struct {
|
||||
sender *Client
|
||||
receiver *Client
|
||||
pairID int
|
||||
}
|
||||
|
||||
// LoadTest manages the load test execution
|
||||
type LoadTest struct {
|
||||
config LoadTestConfig
|
||||
metrics *LoadTestMetrics
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewLoadTest creates a new load test instance
|
||||
func NewLoadTest(config LoadTestConfig) *LoadTest {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &LoadTest{
|
||||
config: config,
|
||||
metrics: &LoadTestMetrics{},
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
// Run executes the load test
|
||||
func (lt *LoadTest) Run() error {
|
||||
lt.metrics.startTime = time.Now()
|
||||
defer func() {
|
||||
lt.metrics.endTime = time.Now()
|
||||
}()
|
||||
|
||||
log.Infof("Starting load test: %d pairs/sec, %d total pairs, message size: %d bytes",
|
||||
lt.config.PairsPerSecond, lt.config.TotalPairs, lt.config.MessageSize)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
pairChan := make(chan int, lt.config.PairsPerSecond)
|
||||
|
||||
for i := 0; i < lt.config.PairsPerSecond; i++ {
|
||||
wg.Add(1)
|
||||
go lt.pairWorker(&wg, pairChan)
|
||||
}
|
||||
|
||||
testCtx := lt.ctx
|
||||
if lt.config.TestDuration > 0 {
|
||||
var testCancel context.CancelFunc
|
||||
testCtx, testCancel = context.WithTimeout(lt.ctx, lt.config.TestDuration)
|
||||
defer testCancel()
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(time.Second / time.Duration(lt.config.PairsPerSecond))
|
||||
defer ticker.Stop()
|
||||
|
||||
pairsCreated := 0
|
||||
for pairsCreated < lt.config.TotalPairs {
|
||||
select {
|
||||
case <-testCtx.Done():
|
||||
log.Infof("Test duration reached or context cancelled")
|
||||
close(pairChan)
|
||||
wg.Wait()
|
||||
return testCtx.Err()
|
||||
case <-ticker.C:
|
||||
select {
|
||||
case pairChan <- pairsCreated:
|
||||
pairsCreated++
|
||||
default:
|
||||
log.Warnf("Worker pool saturated, skipping pair creation")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.Infof("All %d pairs queued, waiting for completion...", pairsCreated)
|
||||
close(pairChan)
|
||||
wg.Wait()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lt *LoadTest) pairWorker(wg *sync.WaitGroup, pairChan <-chan int) {
|
||||
defer wg.Done()
|
||||
|
||||
for pairID := range pairChan {
|
||||
if err := lt.executePairExchange(pairID); err != nil {
|
||||
lt.metrics.TotalErrors.Add(1)
|
||||
lt.metrics.FailedExchanges.Add(1)
|
||||
log.Debugf("Pair %d exchange failed: %v", pairID, err)
|
||||
} else {
|
||||
lt.metrics.SuccessfulExchanges.Add(1)
|
||||
}
|
||||
lt.metrics.TotalPairsSent.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
func (lt *LoadTest) executePairExchange(pairID int) error {
|
||||
senderID := fmt.Sprintf("sender-%d", pairID)
|
||||
receiverID := fmt.Sprintf("receiver-%d", pairID)
|
||||
|
||||
sender, err := NewClient(lt.config.ServerURL, senderID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create sender: %w", err)
|
||||
}
|
||||
defer sender.Close()
|
||||
|
||||
receiver, err := NewClient(lt.config.ServerURL, receiverID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create receiver: %w", err)
|
||||
}
|
||||
defer receiver.Close()
|
||||
|
||||
if err := sender.Connect(); err != nil {
|
||||
return fmt.Errorf("sender connect: %w", err)
|
||||
}
|
||||
|
||||
if err := receiver.Connect(); err != nil {
|
||||
return fmt.Errorf("receiver connect: %w", err)
|
||||
}
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
testMessage := make([]byte, lt.config.MessageSize)
|
||||
for i := range testMessage {
|
||||
testMessage[i] = byte(i % 256)
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
if err := sender.SendMessage(receiverID, testMessage); err != nil {
|
||||
return fmt.Errorf("send message: %w", err)
|
||||
}
|
||||
|
||||
receiveDone := make(chan error, 1)
|
||||
go func() {
|
||||
msg, err := receiver.ReceiveMessage()
|
||||
if err != nil {
|
||||
receiveDone <- err
|
||||
return
|
||||
}
|
||||
if len(msg.Body) == 0 {
|
||||
receiveDone <- fmt.Errorf("empty message body")
|
||||
return
|
||||
}
|
||||
receiveDone <- nil
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-receiveDone:
|
||||
if err != nil {
|
||||
return fmt.Errorf("receive message: %w", err)
|
||||
}
|
||||
latency := time.Since(startTime)
|
||||
lt.recordLatency(latency)
|
||||
lt.metrics.TotalMessagesExchanged.Add(1)
|
||||
return nil
|
||||
case <-time.After(5 * time.Second):
|
||||
return fmt.Errorf("timeout waiting for message")
|
||||
case <-lt.ctx.Done():
|
||||
return lt.ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (lt *LoadTest) recordLatency(latency time.Duration) {
|
||||
lt.metrics.mu.Lock()
|
||||
defer lt.metrics.mu.Unlock()
|
||||
lt.metrics.latencies = append(lt.metrics.latencies, latency)
|
||||
}
|
||||
|
||||
// Stop stops the load test
|
||||
func (lt *LoadTest) Stop() {
|
||||
lt.cancel()
|
||||
}
|
||||
|
||||
// GetMetrics returns the collected metrics
|
||||
func (lt *LoadTest) GetMetrics() *LoadTestMetrics {
|
||||
return lt.metrics
|
||||
}
|
||||
|
||||
// PrintReport prints a summary report of the test results
|
||||
func (m *LoadTestMetrics) PrintReport() {
|
||||
duration := m.endTime.Sub(m.startTime)
|
||||
|
||||
fmt.Println("\n=== Load Test Report ===")
|
||||
fmt.Printf("Test Duration: %v\n", duration)
|
||||
fmt.Printf("Total Pairs Sent: %d\n", m.TotalPairsSent.Load())
|
||||
fmt.Printf("Successful Exchanges: %d\n", m.SuccessfulExchanges.Load())
|
||||
fmt.Printf("Failed Exchanges: %d\n", m.FailedExchanges.Load())
|
||||
fmt.Printf("Total Messages Exchanged: %d\n", m.TotalMessagesExchanged.Load())
|
||||
fmt.Printf("Total Errors: %d\n", m.TotalErrors.Load())
|
||||
|
||||
if duration.Seconds() > 0 {
|
||||
throughput := float64(m.SuccessfulExchanges.Load()) / duration.Seconds()
|
||||
fmt.Printf("Throughput: %.2f pairs/sec\n", throughput)
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
latencies := m.latencies
|
||||
m.mu.Unlock()
|
||||
|
||||
if len(latencies) > 0 {
|
||||
var total time.Duration
|
||||
min := latencies[0]
|
||||
max := latencies[0]
|
||||
|
||||
for _, lat := range latencies {
|
||||
total += lat
|
||||
if lat < min {
|
||||
min = lat
|
||||
}
|
||||
if lat > max {
|
||||
max = lat
|
||||
}
|
||||
}
|
||||
|
||||
avg := total / time.Duration(len(latencies))
|
||||
fmt.Printf("\nLatency Statistics:\n")
|
||||
fmt.Printf(" Min: %v\n", min)
|
||||
fmt.Printf(" Max: %v\n", max)
|
||||
fmt.Printf(" Avg: %v\n", avg)
|
||||
}
|
||||
fmt.Println("========================")
|
||||
}
|
||||
174
signal/loadtest/rate_loadtest_test.go
Normal file
174
signal/loadtest/rate_loadtest_test.go
Normal file
@@ -0,0 +1,174 @@
|
||||
package loadtest
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opentelemetry.io/otel"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/netbirdio/netbird/shared/signal/proto"
|
||||
"github.com/netbirdio/netbird/signal/server"
|
||||
)
|
||||
|
||||
func TestLoadTest_10PairsPerSecond(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping load test in short mode")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
||||
defer grpcServer.Stop()
|
||||
|
||||
config := LoadTestConfig{
|
||||
ServerURL: serverAddr,
|
||||
PairsPerSecond: 10,
|
||||
TotalPairs: 50,
|
||||
MessageSize: 100,
|
||||
TestDuration: 30 * time.Second,
|
||||
}
|
||||
|
||||
loadTest := NewLoadTest(config)
|
||||
err := loadTest.Run()
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics := loadTest.GetMetrics()
|
||||
metrics.PrintReport()
|
||||
|
||||
require.Equal(t, int64(50), metrics.TotalPairsSent.Load(), "Should send all 50 pairs")
|
||||
require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(0), "Should have successful exchanges")
|
||||
require.Equal(t, metrics.TotalMessagesExchanged.Load(), metrics.SuccessfulExchanges.Load(), "Messages exchanged should match successful exchanges")
|
||||
}
|
||||
|
||||
func TestLoadTest_20PairsPerSecond(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping load test in short mode")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
||||
defer grpcServer.Stop()
|
||||
|
||||
config := LoadTestConfig{
|
||||
ServerURL: serverAddr,
|
||||
PairsPerSecond: 20,
|
||||
TotalPairs: 100,
|
||||
MessageSize: 500,
|
||||
TestDuration: 30 * time.Second,
|
||||
}
|
||||
|
||||
loadTest := NewLoadTest(config)
|
||||
err := loadTest.Run()
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics := loadTest.GetMetrics()
|
||||
metrics.PrintReport()
|
||||
|
||||
require.Equal(t, int64(100), metrics.TotalPairsSent.Load(), "Should send all 100 pairs")
|
||||
require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(0), "Should have successful exchanges")
|
||||
}
|
||||
|
||||
func TestLoadTest_SmallBurst(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
||||
defer grpcServer.Stop()
|
||||
|
||||
config := LoadTestConfig{
|
||||
ServerURL: serverAddr,
|
||||
PairsPerSecond: 5,
|
||||
TotalPairs: 10,
|
||||
MessageSize: 50,
|
||||
TestDuration: 10 * time.Second,
|
||||
}
|
||||
|
||||
loadTest := NewLoadTest(config)
|
||||
err := loadTest.Run()
|
||||
require.NoError(t, err)
|
||||
|
||||
metrics := loadTest.GetMetrics()
|
||||
metrics.PrintReport()
|
||||
|
||||
require.Equal(t, int64(10), metrics.TotalPairsSent.Load())
|
||||
require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(5), "At least 50% success rate")
|
||||
require.Less(t, metrics.FailedExchanges.Load(), int64(5), "Less than 50% failure rate")
|
||||
}
|
||||
|
||||
func BenchmarkLoadTest_Throughput(b *testing.B) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
grpcServer, serverAddr := startBenchSignalServer(b, ctx)
|
||||
defer grpcServer.Stop()
|
||||
|
||||
b.Run("5pairs-per-sec", func(b *testing.B) {
|
||||
config := LoadTestConfig{
|
||||
ServerURL: serverAddr,
|
||||
PairsPerSecond: 5,
|
||||
TotalPairs: b.N,
|
||||
MessageSize: 100,
|
||||
}
|
||||
|
||||
loadTest := NewLoadTest(config)
|
||||
b.ResetTimer()
|
||||
_ = loadTest.Run()
|
||||
b.StopTimer()
|
||||
|
||||
metrics := loadTest.GetMetrics()
|
||||
b.ReportMetric(float64(metrics.SuccessfulExchanges.Load()), "successful")
|
||||
b.ReportMetric(float64(metrics.FailedExchanges.Load()), "failed")
|
||||
})
|
||||
}
|
||||
|
||||
func startTestSignalServerForLoad(t *testing.T, ctx context.Context) (*grpc.Server, string) {
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
grpcServer := grpc.NewServer()
|
||||
|
||||
signalServer, err := server.NewServer(ctx, otel.Meter("test"))
|
||||
require.NoError(t, err)
|
||||
|
||||
proto.RegisterSignalExchangeServer(grpcServer, signalServer)
|
||||
|
||||
go func() {
|
||||
if err := grpcServer.Serve(listener); err != nil {
|
||||
t.Logf("Server stopped: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
return grpcServer, fmt.Sprintf("http://%s", listener.Addr().String())
|
||||
}
|
||||
|
||||
func startBenchSignalServer(b *testing.B, ctx context.Context) (*grpc.Server, string) {
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(b, err)
|
||||
|
||||
grpcServer := grpc.NewServer()
|
||||
|
||||
signalServer, err := server.NewServer(ctx, otel.Meter("bench"))
|
||||
require.NoError(b, err)
|
||||
|
||||
proto.RegisterSignalExchangeServer(grpcServer, signalServer)
|
||||
|
||||
go func() {
|
||||
if err := grpcServer.Serve(listener); err != nil {
|
||||
b.Logf("Server stopped: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
return grpcServer, fmt.Sprintf("http://%s", listener.Addr().String())
|
||||
}
|
||||
Reference in New Issue
Block a user