mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-20 09:16:40 +00:00
Compare commits
10 Commits
fix/androi
...
loadtest-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4787e28ae3 | ||
|
|
f9a71e98c7 | ||
|
|
26d1a9b68a | ||
|
|
3d983ddc60 | ||
|
|
9217df05eb | ||
|
|
213043fe7a | ||
|
|
e67829d1d7 | ||
|
|
2288664fe7 | ||
|
|
d1153b5b5d | ||
|
|
6d26c9d1ba |
381
signal/loadtest/README.md
Normal file
381
signal/loadtest/README.md
Normal file
@@ -0,0 +1,381 @@
|
|||||||
|
# Signal Server Load Test
|
||||||
|
|
||||||
|
Load testing tool for the NetBird signal server.
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
- **Rate-based peer pair creation**: Spawn peer pairs at configurable rates (e.g., 10, 20 pairs/sec)
|
||||||
|
- **Two exchange modes**:
|
||||||
|
- **Single message**: Each pair exchanges one message for validation
|
||||||
|
- **Continuous exchange**: Pairs continuously exchange messages for a specified duration (e.g., 30 seconds, 10 minutes)
|
||||||
|
- **TLS/HTTPS support**: Connect to TLS-enabled signal servers with optional certificate verification
|
||||||
|
- **Automatic reconnection**: Optional automatic reconnection with exponential backoff on connection loss
|
||||||
|
- **Configurable message interval**: Control message send rate in continuous mode
|
||||||
|
- **Message exchange validation**: Validates encrypted body size > 0
|
||||||
|
- **Comprehensive metrics**: Tracks throughput, success/failure rates, latency statistics, and reconnection counts
|
||||||
|
- **Local server testing**: Tests include embedded signal server for easy development
|
||||||
|
- **Worker pool pattern**: Efficient concurrent execution
|
||||||
|
- **Graceful shutdown**: Context-based cancellation
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
### Standalone Binary
|
||||||
|
|
||||||
|
Build and run the load test as a standalone binary:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Build the binary
|
||||||
|
cd signal/loadtest/cmd/signal-loadtest
|
||||||
|
go build -o signal-loadtest
|
||||||
|
|
||||||
|
# Single message exchange
|
||||||
|
./signal-loadtest \
|
||||||
|
-server http://localhost:10000 \
|
||||||
|
-pairs-per-sec 10 \
|
||||||
|
-total-pairs 100 \
|
||||||
|
-message-size 100
|
||||||
|
|
||||||
|
# Continuous exchange for 30 seconds
|
||||||
|
./signal-loadtest \
|
||||||
|
-server http://localhost:10000 \
|
||||||
|
-pairs-per-sec 10 \
|
||||||
|
-total-pairs 20 \
|
||||||
|
-message-size 200 \
|
||||||
|
-exchange-duration 30s \
|
||||||
|
-message-interval 200ms
|
||||||
|
|
||||||
|
# Long-running test (10 minutes)
|
||||||
|
./signal-loadtest \
|
||||||
|
-server http://localhost:10000 \
|
||||||
|
-pairs-per-sec 20 \
|
||||||
|
-total-pairs 50 \
|
||||||
|
-message-size 500 \
|
||||||
|
-exchange-duration 10m \
|
||||||
|
-message-interval 100ms \
|
||||||
|
-test-duration 15m \
|
||||||
|
-log-level debug
|
||||||
|
|
||||||
|
# TLS server with valid certificate
|
||||||
|
./signal-loadtest \
|
||||||
|
-server https://signal.example.com:443 \
|
||||||
|
-pairs-per-sec 10 \
|
||||||
|
-total-pairs 50 \
|
||||||
|
-message-size 100
|
||||||
|
|
||||||
|
# TLS server with self-signed certificate
|
||||||
|
./signal-loadtest \
|
||||||
|
-server https://localhost:443 \
|
||||||
|
-pairs-per-sec 5 \
|
||||||
|
-total-pairs 10 \
|
||||||
|
-insecure-skip-verify \
|
||||||
|
-log-level debug
|
||||||
|
|
||||||
|
# High load test with custom worker pool
|
||||||
|
./signal-loadtest \
|
||||||
|
-server http://localhost:10000 \
|
||||||
|
-pairs-per-sec 100 \
|
||||||
|
-total-pairs 1000 \
|
||||||
|
-worker-pool-size 500 \
|
||||||
|
-channel-buffer-size 1000 \
|
||||||
|
-exchange-duration 60s \
|
||||||
|
-log-level info
|
||||||
|
|
||||||
|
# Progress reporting - report every 5000 messages
|
||||||
|
./signal-loadtest \
|
||||||
|
-server http://localhost:10000 \
|
||||||
|
-pairs-per-sec 50 \
|
||||||
|
-total-pairs 100 \
|
||||||
|
-exchange-duration 5m \
|
||||||
|
-report-interval 5000 \
|
||||||
|
-log-level info
|
||||||
|
|
||||||
|
# With automatic reconnection
|
||||||
|
./signal-loadtest \
|
||||||
|
-server http://localhost:10000 \
|
||||||
|
-pairs-per-sec 10 \
|
||||||
|
-total-pairs 50 \
|
||||||
|
-exchange-duration 5m \
|
||||||
|
-enable-reconnect \
|
||||||
|
-initial-retry-delay 100ms \
|
||||||
|
-max-reconnect-delay 30s \
|
||||||
|
-log-level debug
|
||||||
|
|
||||||
|
# Show help
|
||||||
|
./signal-loadtest -h
|
||||||
|
```
|
||||||
|
|
||||||
|
**Graceful Shutdown:**
|
||||||
|
|
||||||
|
The load test supports graceful shutdown via Ctrl+C (SIGINT/SIGTERM):
|
||||||
|
- Press Ctrl+C to interrupt the test at any time
|
||||||
|
- All active clients will be closed gracefully
|
||||||
|
- A final aggregated report will be printed showing metrics up to the point of interruption
|
||||||
|
- Shutdown timeout: 5 seconds (after which the process will force exit)
|
||||||
|
|
||||||
|
**Available Flags:**
|
||||||
|
- `-server`: Signal server URL (http:// or https://) (default: `http://localhost:10000`)
|
||||||
|
- `-pairs-per-sec`: Peer pairs created per second (default: 10)
|
||||||
|
- `-total-pairs`: Total number of peer pairs (default: 100)
|
||||||
|
- `-message-size`: Message size in bytes (default: 100)
|
||||||
|
- `-test-duration`: Maximum test duration, 0 = unlimited (default: 0)
|
||||||
|
- `-exchange-duration`: Continuous exchange duration per pair, 0 = single message (default: 0)
|
||||||
|
- `-message-interval`: Interval between messages in continuous mode (default: 100ms)
|
||||||
|
- `-worker-pool-size`: Number of concurrent workers, 0 = auto (pairs-per-sec × 2) (default: 0)
|
||||||
|
- `-channel-buffer-size`: Work queue buffer size, 0 = auto (pairs-per-sec × 4) (default: 0)
|
||||||
|
- `-report-interval`: Report progress every N messages, 0 = no periodic reports (default: 10000)
|
||||||
|
- `-enable-reconnect`: Enable automatic reconnection on connection loss (default: false)
|
||||||
|
- `-initial-retry-delay`: Initial delay before first reconnection attempt (default: 100ms)
|
||||||
|
- `-max-reconnect-delay`: Maximum delay between reconnection attempts (default: 30s)
|
||||||
|
- `-insecure-skip-verify`: Skip TLS certificate verification for self-signed certificates (default: false)
|
||||||
|
- `-log-level`: Log level: trace, debug, info, warn, error (default: info)
|
||||||
|
|
||||||
|
### Running Tests
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Run all tests (includes load tests)
|
||||||
|
go test -v -timeout 2m
|
||||||
|
|
||||||
|
# Run specific single-message load tests
|
||||||
|
go test -v -run TestLoadTest_10PairsPerSecond -timeout 40s
|
||||||
|
go test -v -run TestLoadTest_20PairsPerSecond -timeout 40s
|
||||||
|
go test -v -run TestLoadTest_SmallBurst -timeout 30s
|
||||||
|
|
||||||
|
# Run continuous exchange tests
|
||||||
|
go test -v -run TestLoadTest_ContinuousExchange_ShortBurst -timeout 30s
|
||||||
|
go test -v -run TestLoadTest_ContinuousExchange_30Seconds -timeout 2m
|
||||||
|
go test -v -run TestLoadTest_ContinuousExchange_10Minutes -timeout 15m
|
||||||
|
|
||||||
|
# Skip long-running tests in quick runs
|
||||||
|
go test -short
|
||||||
|
```
|
||||||
|
|
||||||
|
### Programmatic Usage
|
||||||
|
|
||||||
|
#### Single Message Exchange
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/netbirdio/netbird/signal/loadtest"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
config := loadtest.LoadTestConfig{
|
||||||
|
ServerURL: "http://localhost:10000",
|
||||||
|
PairsPerSecond: 10,
|
||||||
|
TotalPairs: 100,
|
||||||
|
MessageSize: 100,
|
||||||
|
TestDuration: 30 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
lt := loadtest.NewLoadTest(config)
|
||||||
|
if err := lt.Run(); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics := lt.GetMetrics()
|
||||||
|
metrics.PrintReport()
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### Continuous Message Exchange
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/netbirdio/netbird/signal/loadtest"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
config := loadtest.LoadTestConfig{
|
||||||
|
ServerURL: "http://localhost:10000",
|
||||||
|
PairsPerSecond: 10,
|
||||||
|
TotalPairs: 20,
|
||||||
|
MessageSize: 200,
|
||||||
|
ExchangeDuration: 10 * time.Minute, // Each pair exchanges messages for 10 minutes
|
||||||
|
MessageInterval: 200 * time.Millisecond, // Send message every 200ms
|
||||||
|
TestDuration: 15 * time.Minute, // Overall test timeout
|
||||||
|
}
|
||||||
|
|
||||||
|
lt := loadtest.NewLoadTest(config)
|
||||||
|
if err := lt.Run(); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics := lt.GetMetrics()
|
||||||
|
metrics.PrintReport()
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Configuration Options
|
||||||
|
|
||||||
|
- **ServerURL**: Signal server URL (e.g., `http://localhost:10000` or `https://signal.example.com:443`)
|
||||||
|
- **PairsPerSecond**: Rate at which peer pairs are created (e.g., 10, 20)
|
||||||
|
- **TotalPairs**: Total number of peer pairs to create
|
||||||
|
- **MessageSize**: Size of test message payload in bytes
|
||||||
|
- **TestDuration**: Maximum test duration (optional, 0 = no limit)
|
||||||
|
- **ExchangeDuration**: Duration for continuous message exchange per pair (0 = single message)
|
||||||
|
- **MessageInterval**: Interval between messages in continuous mode (default: 100ms)
|
||||||
|
- **WorkerPoolSize**: Number of concurrent worker goroutines (0 = auto: pairs-per-sec × 2)
|
||||||
|
- **ChannelBufferSize**: Work queue buffer size (0 = auto: pairs-per-sec × 4)
|
||||||
|
- **ReportInterval**: Report progress every N messages (0 = no periodic reports, default: 10000)
|
||||||
|
- **EnableReconnect**: Enable automatic reconnection on connection loss (default: false)
|
||||||
|
- **InitialRetryDelay**: Initial delay before first reconnection attempt (default: 100ms)
|
||||||
|
- **MaxReconnectDelay**: Maximum delay between reconnection attempts (default: 30s)
|
||||||
|
- **InsecureSkipVerify**: Skip TLS certificate verification (for self-signed certificates)
|
||||||
|
- **RampUpDuration**: Gradual ramp-up period (not yet implemented)
|
||||||
|
|
||||||
|
### Reconnection Handling
|
||||||
|
|
||||||
|
The load test supports automatic reconnection on connection loss:
|
||||||
|
|
||||||
|
- **Disabled by default**: Connections will fail on any network interruption
|
||||||
|
- **When enabled**: Clients automatically reconnect with exponential backoff
|
||||||
|
- **Exponential backoff**: Starts at `InitialRetryDelay`, doubles on each failure, caps at `MaxReconnectDelay`
|
||||||
|
- **Transparent reconnection**: Message exchange continues after successful reconnection
|
||||||
|
- **Metrics tracking**: Total reconnection count is reported
|
||||||
|
|
||||||
|
**Use cases:**
|
||||||
|
- Testing resilience to network interruptions
|
||||||
|
- Validating server restart behavior
|
||||||
|
- Simulating flaky network conditions
|
||||||
|
- Long-running stability tests
|
||||||
|
|
||||||
|
**Example with reconnection:**
|
||||||
|
```go
|
||||||
|
config := loadtest.LoadTestConfig{
|
||||||
|
ServerURL: "http://localhost:10000",
|
||||||
|
PairsPerSecond: 10,
|
||||||
|
TotalPairs: 20,
|
||||||
|
ExchangeDuration: 10 * time.Minute,
|
||||||
|
EnableReconnect: true,
|
||||||
|
InitialRetryDelay: 100 * time.Millisecond,
|
||||||
|
MaxReconnectDelay: 30 * time.Second,
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Performance Tuning
|
||||||
|
|
||||||
|
When running high-load tests, you may need to adjust the worker pool and buffer sizes:
|
||||||
|
|
||||||
|
- **Default sizing**: Auto-configured based on `PairsPerSecond`
|
||||||
|
- Worker pool: `PairsPerSecond × 2`
|
||||||
|
- Channel buffer: `PairsPerSecond × 4`
|
||||||
|
- **For continuous exchange**: Increase worker pool size (e.g., `PairsPerSecond × 5`)
|
||||||
|
- **For high pair rates** (>50/sec): Increase both worker pool and buffer proportionally
|
||||||
|
- **Signs you need more workers**: Log warnings about "Worker pool saturated"
|
||||||
|
|
||||||
|
Example for 100 pairs/sec with continuous exchange:
|
||||||
|
```go
|
||||||
|
config := LoadTestConfig{
|
||||||
|
PairsPerSecond: 100,
|
||||||
|
WorkerPoolSize: 500, // 5x pairs/sec
|
||||||
|
ChannelBufferSize: 1000, // 10x pairs/sec
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Metrics
|
||||||
|
|
||||||
|
The load test collects and reports:
|
||||||
|
|
||||||
|
- **Total Pairs Sent**: Number of peer pairs attempted
|
||||||
|
- **Successful Exchanges**: Completed message exchanges
|
||||||
|
- **Failed Exchanges**: Failed message exchanges
|
||||||
|
- **Total Messages Exchanged**: Count of successfully exchanged messages
|
||||||
|
- **Total Errors**: Cumulative error count
|
||||||
|
- **Total Reconnections**: Number of automatic reconnections (if enabled)
|
||||||
|
- **Throughput**: Pairs per second (actual)
|
||||||
|
- **Latency Statistics**: Min, Max, Avg message exchange latency
|
||||||
|
|
||||||
|
## Graceful Shutdown Example
|
||||||
|
|
||||||
|
You can interrupt a long-running test at any time with Ctrl+C:
|
||||||
|
|
||||||
|
```
|
||||||
|
./signal-loadtest -server http://localhost:10000 -pairs-per-sec 10 -total-pairs 100 -exchange-duration 10m
|
||||||
|
|
||||||
|
# Press Ctrl+C after some time...
|
||||||
|
^C
|
||||||
|
WARN[0045]
|
||||||
|
Received interrupt signal, shutting down gracefully...
|
||||||
|
|
||||||
|
=== Load Test Report ===
|
||||||
|
Test Duration: 45.234s
|
||||||
|
Total Pairs Sent: 75
|
||||||
|
Successful Exchanges: 75
|
||||||
|
Failed Exchanges: 0
|
||||||
|
Total Messages Exchanged: 22500
|
||||||
|
Total Errors: 0
|
||||||
|
Throughput: 1.66 pairs/sec
|
||||||
|
...
|
||||||
|
========================
|
||||||
|
```
|
||||||
|
|
||||||
|
## Test Results
|
||||||
|
|
||||||
|
Example output from a 20 pairs/sec test:
|
||||||
|
|
||||||
|
```
|
||||||
|
=== Load Test Report ===
|
||||||
|
Test Duration: 5.055249917s
|
||||||
|
Total Pairs Sent: 100
|
||||||
|
Successful Exchanges: 100
|
||||||
|
Failed Exchanges: 0
|
||||||
|
Total Messages Exchanged: 100
|
||||||
|
Total Errors: 0
|
||||||
|
Throughput: 19.78 pairs/sec
|
||||||
|
|
||||||
|
Latency Statistics:
|
||||||
|
Min: 170.375µs
|
||||||
|
Max: 5.176916ms
|
||||||
|
Avg: 441.566µs
|
||||||
|
========================
|
||||||
|
```
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
### Client (`client.go`)
|
||||||
|
- Manages gRPC connection to signal server
|
||||||
|
- Establishes bidirectional stream for receiving messages
|
||||||
|
- Sends messages via `Send` RPC method
|
||||||
|
- Handles message reception asynchronously
|
||||||
|
|
||||||
|
### Load Test Engine (`rate_loadtest.go`)
|
||||||
|
- Worker pool pattern for concurrent peer pairs
|
||||||
|
- Rate-limited pair creation using ticker
|
||||||
|
- Atomic counters for thread-safe metrics collection
|
||||||
|
- Graceful shutdown on context cancellation
|
||||||
|
|
||||||
|
### Test Suite
|
||||||
|
- `loadtest_test.go`: Single pair validation test
|
||||||
|
- `rate_loadtest_test.go`: Multiple rate-based load tests and benchmarks
|
||||||
|
|
||||||
|
## Implementation Details
|
||||||
|
|
||||||
|
### Message Flow
|
||||||
|
1. Create sender and receiver clients with unique IDs
|
||||||
|
2. Both clients connect to signal server via bidirectional stream
|
||||||
|
3. Sender sends encrypted message using `Send` RPC
|
||||||
|
4. Signal server forwards message to receiver's stream
|
||||||
|
5. Receiver reads message from stream
|
||||||
|
6. Validate encrypted body size > 0
|
||||||
|
7. Record latency and success metrics
|
||||||
|
|
||||||
|
### Concurrency
|
||||||
|
- Worker pool size = `PairsPerSecond`
|
||||||
|
- Each worker handles multiple peer pairs sequentially
|
||||||
|
- Atomic operations for metrics to avoid lock contention
|
||||||
|
- Channel-based work distribution
|
||||||
|
|
||||||
|
## Future Enhancements
|
||||||
|
|
||||||
|
- [x] TLS/HTTPS support for production servers
|
||||||
|
- [x] Automatic reconnection with exponential backoff
|
||||||
|
- [ ] Ramp-up period implementation
|
||||||
|
- [ ] Percentile latency metrics (p50, p95, p99)
|
||||||
|
- [ ] Connection reuse for multiple messages per pair
|
||||||
|
- [ ] Support for custom message payloads
|
||||||
|
- [ ] CSV/JSON metrics export
|
||||||
|
- [ ] Real-time metrics dashboard
|
||||||
301
signal/loadtest/client.go
Normal file
301
signal/loadtest/client.go
Normal file
@@ -0,0 +1,301 @@
|
|||||||
|
package loadtest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/shared/signal/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Client represents a signal client for load testing
|
||||||
|
type Client struct {
|
||||||
|
id string
|
||||||
|
serverURL string
|
||||||
|
config *ClientConfig
|
||||||
|
conn *grpc.ClientConn
|
||||||
|
client proto.SignalExchangeClient
|
||||||
|
stream proto.SignalExchange_ConnectStreamClient
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
msgChannel chan *proto.EncryptedMessage
|
||||||
|
|
||||||
|
mu sync.RWMutex
|
||||||
|
reconnectCount int64
|
||||||
|
connected bool
|
||||||
|
receiverStarted bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClientConfig holds optional configuration for the client
|
||||||
|
type ClientConfig struct {
|
||||||
|
InsecureSkipVerify bool
|
||||||
|
EnableReconnect bool
|
||||||
|
MaxReconnectDelay time.Duration
|
||||||
|
InitialRetryDelay time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClient creates a new signal client for load testing
|
||||||
|
func NewClient(serverURL, peerID string) (*Client, error) {
|
||||||
|
return NewClientWithConfig(serverURL, peerID, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClientWithConfig creates a new signal client with custom TLS configuration
|
||||||
|
func NewClientWithConfig(serverURL, peerID string, config *ClientConfig) (*Client, error) {
|
||||||
|
if config == nil {
|
||||||
|
config = &ClientConfig{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set default reconnect delays if not specified
|
||||||
|
if config.EnableReconnect {
|
||||||
|
if config.InitialRetryDelay == 0 {
|
||||||
|
config.InitialRetryDelay = 100 * time.Millisecond
|
||||||
|
}
|
||||||
|
if config.MaxReconnectDelay == 0 {
|
||||||
|
config.MaxReconnectDelay = 30 * time.Second
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
addr, opts, err := parseServerURL(serverURL, config.InsecureSkipVerify)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse server URL: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := grpc.Dial(addr, opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("dial server: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
client := proto.NewSignalExchangeClient(conn)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
return &Client{
|
||||||
|
id: peerID,
|
||||||
|
serverURL: serverURL,
|
||||||
|
config: config,
|
||||||
|
conn: conn,
|
||||||
|
client: client,
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
msgChannel: make(chan *proto.EncryptedMessage, 10),
|
||||||
|
connected: false,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect establishes a stream connection to the signal server
|
||||||
|
func (c *Client) Connect() error {
|
||||||
|
md := metadata.New(map[string]string{proto.HeaderId: c.id})
|
||||||
|
ctx := metadata.NewOutgoingContext(c.ctx, md)
|
||||||
|
|
||||||
|
stream, err := c.client.ConnectStream(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("connect stream: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := stream.Header(); err != nil {
|
||||||
|
return fmt.Errorf("receive header: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
c.stream = stream
|
||||||
|
c.connected = true
|
||||||
|
if !c.receiverStarted {
|
||||||
|
c.receiverStarted = true
|
||||||
|
c.mu.Unlock()
|
||||||
|
go c.receiveMessages()
|
||||||
|
} else {
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// reconnectStream reconnects the stream without starting a new receiver goroutine
|
||||||
|
func (c *Client) reconnectStream() error {
|
||||||
|
if !c.config.EnableReconnect {
|
||||||
|
return fmt.Errorf("reconnect disabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
delay := c.config.InitialRetryDelay
|
||||||
|
attempt := 0
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
return c.ctx.Err()
|
||||||
|
case <-time.After(delay):
|
||||||
|
attempt++
|
||||||
|
log.Debugf("Client %s reconnect attempt %d (delay: %v)", c.id, attempt, delay)
|
||||||
|
|
||||||
|
md := metadata.New(map[string]string{proto.HeaderId: c.id})
|
||||||
|
ctx := metadata.NewOutgoingContext(c.ctx, md)
|
||||||
|
|
||||||
|
stream, err := c.client.ConnectStream(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("Client %s reconnect attempt %d failed: %v", c.id, attempt, err)
|
||||||
|
delay *= 2
|
||||||
|
if delay > c.config.MaxReconnectDelay {
|
||||||
|
delay = c.config.MaxReconnectDelay
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := stream.Header(); err != nil {
|
||||||
|
log.Debugf("Client %s reconnect header failed: %v", c.id, err)
|
||||||
|
delay *= 2
|
||||||
|
if delay > c.config.MaxReconnectDelay {
|
||||||
|
delay = c.config.MaxReconnectDelay
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
c.stream = stream
|
||||||
|
c.connected = true
|
||||||
|
c.reconnectCount++
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
log.Debugf("Client %s reconnected successfully (attempt %d, total reconnects: %d)",
|
||||||
|
c.id, attempt, c.reconnectCount)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendMessage sends an encrypted message to a remote peer using the Send RPC
|
||||||
|
func (c *Client) SendMessage(remotePeerID string, body []byte) error {
|
||||||
|
msg := &proto.EncryptedMessage{
|
||||||
|
Key: c.id,
|
||||||
|
RemoteKey: remotePeerID,
|
||||||
|
Body: body,
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
_, err := c.client.Send(ctx, msg)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("send message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReceiveMessage waits for and returns the next message
|
||||||
|
func (c *Client) ReceiveMessage() (*proto.EncryptedMessage, error) {
|
||||||
|
select {
|
||||||
|
case msg := <-c.msgChannel:
|
||||||
|
return msg, nil
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
return nil, c.ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the client connection
|
||||||
|
func (c *Client) Close() error {
|
||||||
|
c.cancel()
|
||||||
|
if c.conn != nil {
|
||||||
|
return c.conn.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) receiveMessages() {
|
||||||
|
for {
|
||||||
|
c.mu.RLock()
|
||||||
|
stream := c.stream
|
||||||
|
c.mu.RUnlock()
|
||||||
|
|
||||||
|
if stream == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
msg, err := stream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
// Check if context is cancelled before attempting reconnection
|
||||||
|
select {
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
c.connected = false
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
log.Debugf("Client %s receive error: %v", c.id, err)
|
||||||
|
|
||||||
|
// Attempt reconnection if enabled
|
||||||
|
if c.config.EnableReconnect {
|
||||||
|
if reconnectErr := c.reconnectStream(); reconnectErr != nil {
|
||||||
|
log.Debugf("Client %s reconnection failed: %v", c.id, reconnectErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Successfully reconnected, continue receiving
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reconnect disabled, exit
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case c.msgChannel <- msg:
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsConnected returns whether the client is currently connected
|
||||||
|
func (c *Client) IsConnected() bool {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
return c.connected
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetReconnectCount returns the number of reconnections
|
||||||
|
func (c *Client) GetReconnectCount() int64 {
|
||||||
|
c.mu.RLock()
|
||||||
|
defer c.mu.RUnlock()
|
||||||
|
return c.reconnectCount
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseServerURL(serverURL string, insecureSkipVerify bool) (string, []grpc.DialOption, error) {
|
||||||
|
serverURL = strings.TrimSpace(serverURL)
|
||||||
|
if serverURL == "" {
|
||||||
|
return "", nil, fmt.Errorf("server URL is empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
var addr string
|
||||||
|
var opts []grpc.DialOption
|
||||||
|
|
||||||
|
if strings.HasPrefix(serverURL, "https://") {
|
||||||
|
addr = strings.TrimPrefix(serverURL, "https://")
|
||||||
|
tlsConfig := &tls.Config{
|
||||||
|
MinVersion: tls.VersionTLS12,
|
||||||
|
InsecureSkipVerify: insecureSkipVerify,
|
||||||
|
}
|
||||||
|
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
|
||||||
|
} else if strings.HasPrefix(serverURL, "http://") {
|
||||||
|
addr = strings.TrimPrefix(serverURL, "http://")
|
||||||
|
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
} else {
|
||||||
|
addr = serverURL
|
||||||
|
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
}
|
||||||
|
|
||||||
|
if !strings.Contains(addr, ":") {
|
||||||
|
return "", nil, fmt.Errorf("server URL must include port")
|
||||||
|
}
|
||||||
|
|
||||||
|
return addr, opts, nil
|
||||||
|
}
|
||||||
128
signal/loadtest/cmd/signal-loadtest/integration_test.go
Normal file
128
signal/loadtest/cmd/signal-loadtest/integration_test.go
Normal file
@@ -0,0 +1,128 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"os"
|
||||||
|
"os/exec"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/shared/signal/proto"
|
||||||
|
"github.com/netbirdio/netbird/signal/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestCLI_SingleMessage(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
grpcServer, serverAddr := startTestSignalServer(t, ctx)
|
||||||
|
defer grpcServer.Stop()
|
||||||
|
|
||||||
|
cmd := exec.Command("go", "run", "main.go",
|
||||||
|
"-server", serverAddr,
|
||||||
|
"-pairs-per-sec", "3",
|
||||||
|
"-total-pairs", "5",
|
||||||
|
"-message-size", "50",
|
||||||
|
"-log-level", "warn")
|
||||||
|
|
||||||
|
output, err := cmd.CombinedOutput()
|
||||||
|
require.NoError(t, err, "CLI should execute successfully")
|
||||||
|
|
||||||
|
outputStr := string(output)
|
||||||
|
require.Contains(t, outputStr, "Load Test Report")
|
||||||
|
require.Contains(t, outputStr, "Total Pairs Sent: 5")
|
||||||
|
require.Contains(t, outputStr, "Successful Exchanges: 5")
|
||||||
|
t.Logf("Output:\n%s", outputStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCLI_ContinuousExchange(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping continuous exchange CLI test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
grpcServer, serverAddr := startTestSignalServer(t, ctx)
|
||||||
|
defer grpcServer.Stop()
|
||||||
|
|
||||||
|
cmd := exec.Command("go", "run", "main.go",
|
||||||
|
"-server", serverAddr,
|
||||||
|
"-pairs-per-sec", "2",
|
||||||
|
"-total-pairs", "3",
|
||||||
|
"-message-size", "100",
|
||||||
|
"-exchange-duration", "3s",
|
||||||
|
"-message-interval", "100ms",
|
||||||
|
"-log-level", "warn")
|
||||||
|
|
||||||
|
output, err := cmd.CombinedOutput()
|
||||||
|
require.NoError(t, err, "CLI should execute successfully")
|
||||||
|
|
||||||
|
outputStr := string(output)
|
||||||
|
require.Contains(t, outputStr, "Load Test Report")
|
||||||
|
require.Contains(t, outputStr, "Total Pairs Sent: 3")
|
||||||
|
require.Contains(t, outputStr, "Successful Exchanges: 3")
|
||||||
|
t.Logf("Output:\n%s", outputStr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCLI_InvalidConfig(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "negative pairs",
|
||||||
|
args: []string{"-pairs-per-sec", "-1"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "zero total pairs",
|
||||||
|
args: []string{"-total-pairs", "0"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "negative message size",
|
||||||
|
args: []string{"-message-size", "-100"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
args := append([]string{"run", "main.go"}, tt.args...)
|
||||||
|
cmd := exec.Command("go", args...)
|
||||||
|
output, err := cmd.CombinedOutput()
|
||||||
|
require.Error(t, err, "Should fail with invalid config")
|
||||||
|
require.Contains(t, string(output), "Configuration error")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func startTestSignalServer(t *testing.T, ctx context.Context) (*grpc.Server, string) {
|
||||||
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
grpcServer := grpc.NewServer()
|
||||||
|
|
||||||
|
signalServer, err := server.NewServer(ctx, otel.Meter("cli-test"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
proto.RegisterSignalExchangeServer(grpcServer, signalServer)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := grpcServer.Serve(listener); err != nil {
|
||||||
|
t.Logf("Server stopped: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
return grpcServer, fmt.Sprintf("http://%s", listener.Addr().String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
os.Exit(m.Run())
|
||||||
|
}
|
||||||
165
signal/loadtest/cmd/signal-loadtest/main.go
Normal file
165
signal/loadtest/cmd/signal-loadtest/main.go
Normal file
@@ -0,0 +1,165 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/signal/loadtest"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
serverURL string
|
||||||
|
pairsPerSecond int
|
||||||
|
totalPairs int
|
||||||
|
messageSize int
|
||||||
|
testDuration time.Duration
|
||||||
|
exchangeDuration time.Duration
|
||||||
|
messageInterval time.Duration
|
||||||
|
insecureSkipVerify bool
|
||||||
|
workerPoolSize int
|
||||||
|
channelBufferSize int
|
||||||
|
reportInterval int
|
||||||
|
logLevel string
|
||||||
|
enableReconnect bool
|
||||||
|
maxReconnectDelay time.Duration
|
||||||
|
initialRetryDelay time.Duration
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
flag.StringVar(&serverURL, "server", "http://localhost:10000", "Signal server URL (http:// or https://)")
|
||||||
|
flag.IntVar(&pairsPerSecond, "pairs-per-sec", 10, "Number of peer pairs to create per second")
|
||||||
|
flag.IntVar(&totalPairs, "total-pairs", 100, "Total number of peer pairs to create")
|
||||||
|
flag.IntVar(&messageSize, "message-size", 100, "Size of test message in bytes")
|
||||||
|
flag.DurationVar(&testDuration, "test-duration", 0, "Maximum test duration (0 = unlimited)")
|
||||||
|
flag.DurationVar(&exchangeDuration, "exchange-duration", 0, "Duration for continuous message exchange per pair (0 = single message)")
|
||||||
|
flag.DurationVar(&messageInterval, "message-interval", 100*time.Millisecond, "Interval between messages in continuous mode")
|
||||||
|
flag.BoolVar(&insecureSkipVerify, "insecure-skip-verify", false, "Skip TLS certificate verification (use with self-signed certificates)")
|
||||||
|
flag.IntVar(&workerPoolSize, "worker-pool-size", 0, "Number of worker goroutines (0 = auto: pairs-per-sec * 2)")
|
||||||
|
flag.IntVar(&channelBufferSize, "channel-buffer-size", 0, "Channel buffer size (0 = auto: pairs-per-sec * 4)")
|
||||||
|
flag.IntVar(&reportInterval, "report-interval", 10000, "Report progress every N messages (0 = no periodic reports)")
|
||||||
|
flag.StringVar(&logLevel, "log-level", "info", "Log level (trace, debug, info, warn, error)")
|
||||||
|
flag.BoolVar(&enableReconnect, "enable-reconnect", true, "Enable automatic reconnection on connection loss")
|
||||||
|
flag.DurationVar(&maxReconnectDelay, "max-reconnect-delay", 30*time.Second, "Maximum delay between reconnection attempts")
|
||||||
|
flag.DurationVar(&initialRetryDelay, "initial-retry-delay", 100*time.Millisecond, "Initial delay before first reconnection attempt")
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
level, err := log.ParseLevel(logLevel)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Invalid log level: %v\n", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
log.SetLevel(level)
|
||||||
|
|
||||||
|
config := loadtest.LoadTestConfig{
|
||||||
|
ServerURL: serverURL,
|
||||||
|
PairsPerSecond: pairsPerSecond,
|
||||||
|
TotalPairs: totalPairs,
|
||||||
|
MessageSize: messageSize,
|
||||||
|
TestDuration: testDuration,
|
||||||
|
ExchangeDuration: exchangeDuration,
|
||||||
|
MessageInterval: messageInterval,
|
||||||
|
InsecureSkipVerify: insecureSkipVerify,
|
||||||
|
WorkerPoolSize: workerPoolSize,
|
||||||
|
ChannelBufferSize: channelBufferSize,
|
||||||
|
ReportInterval: reportInterval,
|
||||||
|
EnableReconnect: enableReconnect,
|
||||||
|
MaxReconnectDelay: maxReconnectDelay,
|
||||||
|
InitialRetryDelay: initialRetryDelay,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := validateConfig(config); err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "Configuration error: %v\n", err)
|
||||||
|
flag.Usage()
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Signal Load Test Configuration:")
|
||||||
|
log.Infof(" Server URL: %s", config.ServerURL)
|
||||||
|
log.Infof(" Pairs per second: %d", config.PairsPerSecond)
|
||||||
|
log.Infof(" Total pairs: %d", config.TotalPairs)
|
||||||
|
log.Infof(" Message size: %d bytes", config.MessageSize)
|
||||||
|
if config.InsecureSkipVerify {
|
||||||
|
log.Warnf(" TLS certificate verification: DISABLED (insecure)")
|
||||||
|
}
|
||||||
|
if config.TestDuration > 0 {
|
||||||
|
log.Infof(" Test duration: %v", config.TestDuration)
|
||||||
|
}
|
||||||
|
if config.ExchangeDuration > 0 {
|
||||||
|
log.Infof(" Exchange duration: %v", config.ExchangeDuration)
|
||||||
|
log.Infof(" Message interval: %v", config.MessageInterval)
|
||||||
|
} else {
|
||||||
|
log.Infof(" Mode: Single message exchange")
|
||||||
|
}
|
||||||
|
if config.EnableReconnect {
|
||||||
|
log.Infof(" Reconnection: ENABLED")
|
||||||
|
log.Infof(" Initial retry delay: %v", config.InitialRetryDelay)
|
||||||
|
log.Infof(" Max reconnect delay: %v", config.MaxReconnectDelay)
|
||||||
|
}
|
||||||
|
fmt.Println()
|
||||||
|
|
||||||
|
// Set up signal handler for graceful shutdown
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
sigChan := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
|
||||||
|
|
||||||
|
lt := loadtest.NewLoadTestWithContext(ctx, config)
|
||||||
|
|
||||||
|
// Run load test in a goroutine
|
||||||
|
done := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
done <- lt.Run()
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Wait for completion or signal
|
||||||
|
select {
|
||||||
|
case <-sigChan:
|
||||||
|
log.Warnf("\nReceived interrupt signal, shutting down gracefully...")
|
||||||
|
cancel()
|
||||||
|
// Wait a bit for graceful shutdown
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
log.Warnf("Shutdown timeout, forcing exit")
|
||||||
|
}
|
||||||
|
case err := <-done:
|
||||||
|
if err != nil && err != context.Canceled {
|
||||||
|
log.Errorf("Load test failed: %v", err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics := lt.GetMetrics()
|
||||||
|
fmt.Println() // Add blank line before report
|
||||||
|
metrics.PrintReport()
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateConfig(config loadtest.LoadTestConfig) error {
|
||||||
|
if config.ServerURL == "" {
|
||||||
|
return fmt.Errorf("server URL is required")
|
||||||
|
}
|
||||||
|
if config.PairsPerSecond <= 0 {
|
||||||
|
return fmt.Errorf("pairs-per-sec must be greater than 0")
|
||||||
|
}
|
||||||
|
if config.TotalPairs <= 0 {
|
||||||
|
return fmt.Errorf("total-pairs must be greater than 0")
|
||||||
|
}
|
||||||
|
if config.MessageSize <= 0 {
|
||||||
|
return fmt.Errorf("message-size must be greater than 0")
|
||||||
|
}
|
||||||
|
if config.MessageInterval <= 0 {
|
||||||
|
return fmt.Errorf("message-interval must be greater than 0")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
40
signal/loadtest/cmd/signal-loadtest/test.sh
Normal file
40
signal/loadtest/cmd/signal-loadtest/test.sh
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
set -e
|
||||||
|
|
||||||
|
echo "Building signal-loadtest binary..."
|
||||||
|
go build -o signal-loadtest
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "=== Test 1: Single message exchange (5 pairs) ==="
|
||||||
|
./signal-loadtest \
|
||||||
|
-server http://localhost:10000 \
|
||||||
|
-pairs-per-sec 5 \
|
||||||
|
-total-pairs 5 \
|
||||||
|
-message-size 50 \
|
||||||
|
-log-level info
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "=== Test 2: Continuous exchange (3 pairs, 5 seconds) ==="
|
||||||
|
./signal-loadtest \
|
||||||
|
-server http://localhost:10000 \
|
||||||
|
-pairs-per-sec 3 \
|
||||||
|
-total-pairs 3 \
|
||||||
|
-message-size 100 \
|
||||||
|
-exchange-duration 5s \
|
||||||
|
-message-interval 200ms \
|
||||||
|
-log-level info
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "=== Test 3: Progress reporting (10 pairs, 10s, report every 100 messages) ==="
|
||||||
|
./signal-loadtest \
|
||||||
|
-server http://localhost:10000 \
|
||||||
|
-pairs-per-sec 10 \
|
||||||
|
-total-pairs 10 \
|
||||||
|
-message-size 100 \
|
||||||
|
-exchange-duration 10s \
|
||||||
|
-message-interval 100ms \
|
||||||
|
-report-interval 100 \
|
||||||
|
-log-level info
|
||||||
|
|
||||||
|
echo ""
|
||||||
|
echo "All tests completed!"
|
||||||
91
signal/loadtest/loadtest_test.go
Normal file
91
signal/loadtest/loadtest_test.go
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
package loadtest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/shared/signal/proto"
|
||||||
|
"github.com/netbirdio/netbird/signal/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSignalLoadTest_SinglePair(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
grpcServer, serverAddr := startTestSignalServer(t, ctx)
|
||||||
|
defer grpcServer.Stop()
|
||||||
|
|
||||||
|
sender, err := NewClient(serverAddr, "sender-peer-id")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer sender.Close()
|
||||||
|
|
||||||
|
receiver, err := NewClient(serverAddr, "receiver-peer-id")
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer receiver.Close()
|
||||||
|
|
||||||
|
err = sender.Connect()
|
||||||
|
require.NoError(t, err, "Sender should connect successfully")
|
||||||
|
|
||||||
|
err = receiver.Connect()
|
||||||
|
require.NoError(t, err, "Receiver should connect successfully")
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
testMessage := []byte("test message payload")
|
||||||
|
|
||||||
|
t.Log("Sending message from sender to receiver")
|
||||||
|
err = sender.SendMessage("receiver-peer-id", testMessage)
|
||||||
|
require.NoError(t, err, "Sender should send message successfully")
|
||||||
|
|
||||||
|
t.Log("Waiting for receiver to receive message")
|
||||||
|
|
||||||
|
receiveDone := make(chan struct{})
|
||||||
|
var msg *proto.EncryptedMessage
|
||||||
|
var receiveErr error
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
msg, receiveErr = receiver.ReceiveMessage()
|
||||||
|
close(receiveDone)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-receiveDone:
|
||||||
|
require.NoError(t, receiveErr, "Receiver should receive message")
|
||||||
|
require.NotNil(t, msg, "Received message should not be nil")
|
||||||
|
require.Greater(t, len(msg.Body), 0, "Encrypted message body size should be greater than 0")
|
||||||
|
require.Equal(t, "sender-peer-id", msg.Key)
|
||||||
|
require.Equal(t, "receiver-peer-id", msg.RemoteKey)
|
||||||
|
t.Log("Message received successfully")
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatal("Timeout waiting for message")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func startTestSignalServer(t *testing.T, ctx context.Context) (*grpc.Server, string) {
|
||||||
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
grpcServer := grpc.NewServer()
|
||||||
|
|
||||||
|
signalServer, err := server.NewServer(ctx, otel.Meter("test"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
proto.RegisterSignalExchangeServer(grpcServer, signalServer)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := grpcServer.Serve(listener); err != nil {
|
||||||
|
t.Logf("Server stopped: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
return grpcServer, fmt.Sprintf("http://%s", listener.Addr().String())
|
||||||
|
}
|
||||||
461
signal/loadtest/rate_loadtest.go
Normal file
461
signal/loadtest/rate_loadtest.go
Normal file
@@ -0,0 +1,461 @@
|
|||||||
|
package loadtest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// LoadTestConfig configuration for the load test
|
||||||
|
type LoadTestConfig struct {
|
||||||
|
IDPrefix string
|
||||||
|
ServerURL string
|
||||||
|
PairsPerSecond int
|
||||||
|
TotalPairs int
|
||||||
|
MessageSize int
|
||||||
|
TestDuration time.Duration
|
||||||
|
ExchangeDuration time.Duration
|
||||||
|
MessageInterval time.Duration
|
||||||
|
RampUpDuration time.Duration
|
||||||
|
InsecureSkipVerify bool
|
||||||
|
WorkerPoolSize int
|
||||||
|
ChannelBufferSize int
|
||||||
|
ReportInterval int // Report progress every N messages (0 = no periodic reports)
|
||||||
|
EnableReconnect bool
|
||||||
|
MaxReconnectDelay time.Duration
|
||||||
|
InitialRetryDelay time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadTestMetrics metrics collected during the load test
|
||||||
|
type LoadTestMetrics struct {
|
||||||
|
TotalPairsSent atomic.Int64
|
||||||
|
TotalMessagesExchanged atomic.Int64
|
||||||
|
TotalErrors atomic.Int64
|
||||||
|
SuccessfulExchanges atomic.Int64
|
||||||
|
FailedExchanges atomic.Int64
|
||||||
|
ActivePairs atomic.Int64
|
||||||
|
TotalReconnections atomic.Int64
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
latencies []time.Duration
|
||||||
|
startTime time.Time
|
||||||
|
endTime time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// PeerPair represents a sender-receiver pair
|
||||||
|
type PeerPair struct {
|
||||||
|
sender *Client
|
||||||
|
receiver *Client
|
||||||
|
pairID int
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadTest manages the load test execution
|
||||||
|
type LoadTest struct {
|
||||||
|
config LoadTestConfig
|
||||||
|
metrics *LoadTestMetrics
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
|
reporterCtx context.Context
|
||||||
|
reporterCancel context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLoadTest creates a new load test instance
|
||||||
|
func NewLoadTest(config LoadTestConfig) *LoadTest {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
return newLoadTestWithContext(ctx, cancel, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLoadTestWithContext creates a new load test instance with a custom context
|
||||||
|
func NewLoadTestWithContext(ctx context.Context, config LoadTestConfig) *LoadTest {
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
return newLoadTestWithContext(ctx, cancel, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLoadTestWithContext(ctx context.Context, cancel context.CancelFunc, config LoadTestConfig) *LoadTest {
|
||||||
|
reporterCtx, reporterCancel := context.WithCancel(ctx)
|
||||||
|
config.IDPrefix = fmt.Sprintf("%d-", time.Now().UnixNano())
|
||||||
|
return &LoadTest{
|
||||||
|
config: config,
|
||||||
|
metrics: &LoadTestMetrics{},
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
|
reporterCtx: reporterCtx,
|
||||||
|
reporterCancel: reporterCancel,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run executes the load test
|
||||||
|
func (lt *LoadTest) Run() error {
|
||||||
|
lt.metrics.startTime = time.Now()
|
||||||
|
defer func() {
|
||||||
|
lt.metrics.endTime = time.Now()
|
||||||
|
}()
|
||||||
|
|
||||||
|
exchangeInfo := "single message"
|
||||||
|
if lt.config.ExchangeDuration > 0 {
|
||||||
|
exchangeInfo = fmt.Sprintf("continuous for %v", lt.config.ExchangeDuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
workerPoolSize := lt.config.WorkerPoolSize
|
||||||
|
if workerPoolSize == 0 {
|
||||||
|
workerPoolSize = lt.config.PairsPerSecond * 2
|
||||||
|
}
|
||||||
|
|
||||||
|
channelBufferSize := lt.config.ChannelBufferSize
|
||||||
|
if channelBufferSize == 0 {
|
||||||
|
channelBufferSize = lt.config.PairsPerSecond * 4
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Starting load test: %d pairs/sec, %d total pairs, message size: %d bytes, exchange: %s",
|
||||||
|
lt.config.PairsPerSecond, lt.config.TotalPairs, lt.config.MessageSize, exchangeInfo)
|
||||||
|
log.Infof("Worker pool size: %d, channel buffer: %d", workerPoolSize, channelBufferSize)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var reporterWg sync.WaitGroup
|
||||||
|
pairChan := make(chan int, channelBufferSize)
|
||||||
|
|
||||||
|
// Start progress reporter if configured
|
||||||
|
if lt.config.ReportInterval > 0 {
|
||||||
|
reporterWg.Add(1)
|
||||||
|
go lt.progressReporter(&reporterWg, lt.config.ReportInterval)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < workerPoolSize; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go lt.pairWorker(&wg, pairChan)
|
||||||
|
}
|
||||||
|
|
||||||
|
testCtx := lt.ctx
|
||||||
|
if lt.config.TestDuration > 0 {
|
||||||
|
var testCancel context.CancelFunc
|
||||||
|
testCtx, testCancel = context.WithTimeout(lt.ctx, lt.config.TestDuration)
|
||||||
|
defer testCancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
ticker := time.NewTicker(time.Second / time.Duration(lt.config.PairsPerSecond))
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
pairsCreated := 0
|
||||||
|
for pairsCreated < lt.config.TotalPairs {
|
||||||
|
select {
|
||||||
|
case <-testCtx.Done():
|
||||||
|
log.Infof("Test duration reached or context cancelled")
|
||||||
|
close(pairChan)
|
||||||
|
wg.Wait()
|
||||||
|
return testCtx.Err()
|
||||||
|
case <-ticker.C:
|
||||||
|
select {
|
||||||
|
case pairChan <- pairsCreated:
|
||||||
|
pairsCreated++
|
||||||
|
default:
|
||||||
|
log.Warnf("Worker pool saturated, skipping pair creation")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("All %d pairs queued, waiting for completion...", pairsCreated)
|
||||||
|
close(pairChan)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Cancel progress reporter context after all work is done and wait for it
|
||||||
|
lt.reporterCancel()
|
||||||
|
reporterWg.Wait()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lt *LoadTest) pairWorker(wg *sync.WaitGroup, pairChan <-chan int) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
for pairID := range pairChan {
|
||||||
|
lt.metrics.ActivePairs.Add(1)
|
||||||
|
if err := lt.executePairExchange(pairID); err != nil {
|
||||||
|
lt.metrics.TotalErrors.Add(1)
|
||||||
|
lt.metrics.FailedExchanges.Add(1)
|
||||||
|
log.Debugf("Pair %d exchange failed: %v", pairID, err)
|
||||||
|
} else {
|
||||||
|
lt.metrics.SuccessfulExchanges.Add(1)
|
||||||
|
}
|
||||||
|
lt.metrics.ActivePairs.Add(-1)
|
||||||
|
lt.metrics.TotalPairsSent.Add(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lt *LoadTest) executePairExchange(pairID int) error {
|
||||||
|
senderID := fmt.Sprintf("%ssender-%d", lt.config.IDPrefix, pairID)
|
||||||
|
receiverID := fmt.Sprintf("%sreceiver-%d", lt.config.IDPrefix, pairID)
|
||||||
|
|
||||||
|
clientConfig := &ClientConfig{
|
||||||
|
InsecureSkipVerify: lt.config.InsecureSkipVerify,
|
||||||
|
EnableReconnect: lt.config.EnableReconnect,
|
||||||
|
MaxReconnectDelay: lt.config.MaxReconnectDelay,
|
||||||
|
InitialRetryDelay: lt.config.InitialRetryDelay,
|
||||||
|
}
|
||||||
|
|
||||||
|
sender, err := NewClientWithConfig(lt.config.ServerURL, senderID, clientConfig)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("create sender: %w", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
sender.Close()
|
||||||
|
lt.metrics.TotalReconnections.Add(sender.GetReconnectCount())
|
||||||
|
}()
|
||||||
|
|
||||||
|
receiver, err := NewClientWithConfig(lt.config.ServerURL, receiverID, clientConfig)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("create receiver: %w", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
receiver.Close()
|
||||||
|
lt.metrics.TotalReconnections.Add(receiver.GetReconnectCount())
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err := sender.Connect(); err != nil {
|
||||||
|
return fmt.Errorf("sender connect: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := receiver.Connect(); err != nil {
|
||||||
|
return fmt.Errorf("receiver connect: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
|
||||||
|
testMessage := make([]byte, lt.config.MessageSize)
|
||||||
|
for i := range testMessage {
|
||||||
|
testMessage[i] = byte(i % 256)
|
||||||
|
}
|
||||||
|
|
||||||
|
if lt.config.ExchangeDuration > 0 {
|
||||||
|
return lt.continuousExchange(pairID, sender, receiver, receiverID, testMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
return lt.singleExchange(sender, receiver, receiverID, testMessage)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lt *LoadTest) singleExchange(sender, receiver *Client, receiverID string, testMessage []byte) error {
|
||||||
|
startTime := time.Now()
|
||||||
|
|
||||||
|
if err := sender.SendMessage(receiverID, testMessage); err != nil {
|
||||||
|
return fmt.Errorf("send message: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
receiveDone := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
msg, err := receiver.ReceiveMessage()
|
||||||
|
if err != nil {
|
||||||
|
receiveDone <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(msg.Body) == 0 {
|
||||||
|
receiveDone <- fmt.Errorf("empty message body")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
receiveDone <- nil
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-receiveDone:
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("receive message: %w", err)
|
||||||
|
}
|
||||||
|
latency := time.Since(startTime)
|
||||||
|
lt.recordLatency(latency)
|
||||||
|
lt.metrics.TotalMessagesExchanged.Add(1)
|
||||||
|
return nil
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
return fmt.Errorf("timeout waiting for message")
|
||||||
|
case <-lt.ctx.Done():
|
||||||
|
return lt.ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lt *LoadTest) continuousExchange(pairID int, sender, receiver *Client, receiverID string, testMessage []byte) error {
|
||||||
|
exchangeCtx, cancel := context.WithTimeout(lt.ctx, lt.config.ExchangeDuration)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
messageInterval := lt.config.MessageInterval
|
||||||
|
if messageInterval == 0 {
|
||||||
|
messageInterval = 100 * time.Millisecond
|
||||||
|
}
|
||||||
|
|
||||||
|
errChan := make(chan error, 1)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
if err := lt.receiverLoop(exchangeCtx, receiver, pairID); err != nil && err != context.DeadlineExceeded && err != context.Canceled {
|
||||||
|
select {
|
||||||
|
case errChan <- err:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
if err := lt.senderLoop(exchangeCtx, sender, receiverID, testMessage, messageInterval); err != nil && err != context.DeadlineExceeded && err != context.Canceled {
|
||||||
|
select {
|
||||||
|
case errChan <- err:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-errChan:
|
||||||
|
return err
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lt *LoadTest) senderLoop(ctx context.Context, sender *Client, receiverID string, message []byte, interval time.Duration) error {
|
||||||
|
ticker := time.NewTicker(interval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-ticker.C:
|
||||||
|
startTime := time.Now()
|
||||||
|
if err := sender.SendMessage(receiverID, message); err != nil {
|
||||||
|
lt.metrics.TotalErrors.Add(1)
|
||||||
|
log.Debugf("Send error: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
lt.recordLatency(time.Since(startTime))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lt *LoadTest) receiverLoop(ctx context.Context, receiver *Client, pairID int) error {
|
||||||
|
for {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case msg, ok := <-receiver.msgChannel:
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if len(msg.Body) > 0 {
|
||||||
|
lt.metrics.TotalMessagesExchanged.Add(1)
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lt *LoadTest) recordLatency(latency time.Duration) {
|
||||||
|
lt.metrics.mu.Lock()
|
||||||
|
defer lt.metrics.mu.Unlock()
|
||||||
|
lt.metrics.latencies = append(lt.metrics.latencies, latency)
|
||||||
|
}
|
||||||
|
|
||||||
|
// progressReporter prints periodic progress reports
|
||||||
|
func (lt *LoadTest) progressReporter(wg *sync.WaitGroup, interval int) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
lastReported := int64(0)
|
||||||
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-lt.reporterCtx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
currentMessages := lt.metrics.TotalMessagesExchanged.Load()
|
||||||
|
if currentMessages-lastReported >= int64(interval) {
|
||||||
|
elapsed := time.Since(lt.metrics.startTime)
|
||||||
|
activePairs := lt.metrics.ActivePairs.Load()
|
||||||
|
errors := lt.metrics.TotalErrors.Load()
|
||||||
|
reconnections := lt.metrics.TotalReconnections.Load()
|
||||||
|
|
||||||
|
var msgRate float64
|
||||||
|
if elapsed.Seconds() > 0 {
|
||||||
|
msgRate = float64(currentMessages) / elapsed.Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("Progress: %d messages exchanged, %d active pairs, %d errors, %d reconnections, %.2f msg/sec, elapsed: %v",
|
||||||
|
currentMessages, activePairs, errors, reconnections, msgRate, elapsed.Round(time.Second))
|
||||||
|
|
||||||
|
lastReported = (currentMessages / int64(interval)) * int64(interval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops the load test
|
||||||
|
func (lt *LoadTest) Stop() {
|
||||||
|
lt.cancel()
|
||||||
|
lt.reporterCancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetMetrics returns the collected metrics
|
||||||
|
func (lt *LoadTest) GetMetrics() *LoadTestMetrics {
|
||||||
|
return lt.metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
// PrintReport prints a summary report of the test results
|
||||||
|
func (m *LoadTestMetrics) PrintReport() {
|
||||||
|
duration := m.endTime.Sub(m.startTime)
|
||||||
|
|
||||||
|
fmt.Println("\n=== Load Test Report ===")
|
||||||
|
fmt.Printf("Test Duration: %v\n", duration)
|
||||||
|
fmt.Printf("Total Pairs Sent: %d\n", m.TotalPairsSent.Load())
|
||||||
|
fmt.Printf("Successful Exchanges: %d\n", m.SuccessfulExchanges.Load())
|
||||||
|
fmt.Printf("Failed Exchanges: %d\n", m.FailedExchanges.Load())
|
||||||
|
fmt.Printf("Total Messages Exchanged: %d\n", m.TotalMessagesExchanged.Load())
|
||||||
|
fmt.Printf("Total Errors: %d\n", m.TotalErrors.Load())
|
||||||
|
|
||||||
|
reconnections := m.TotalReconnections.Load()
|
||||||
|
if reconnections > 0 {
|
||||||
|
fmt.Printf("Total Reconnections: %d\n", reconnections)
|
||||||
|
}
|
||||||
|
|
||||||
|
if duration.Seconds() > 0 {
|
||||||
|
throughput := float64(m.SuccessfulExchanges.Load()) / duration.Seconds()
|
||||||
|
fmt.Printf("Throughput: %.2f pairs/sec\n", throughput)
|
||||||
|
}
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
latencies := m.latencies
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
if len(latencies) > 0 {
|
||||||
|
var total time.Duration
|
||||||
|
minLatency := latencies[0]
|
||||||
|
maxLatency := latencies[0]
|
||||||
|
|
||||||
|
for _, lat := range latencies {
|
||||||
|
total += lat
|
||||||
|
if lat < minLatency {
|
||||||
|
minLatency = lat
|
||||||
|
}
|
||||||
|
if lat > maxLatency {
|
||||||
|
maxLatency = lat
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
avg := total / time.Duration(len(latencies))
|
||||||
|
fmt.Printf("\nLatency Statistics:\n")
|
||||||
|
fmt.Printf(" Min: %v\n", minLatency)
|
||||||
|
fmt.Printf(" Max: %v\n", maxLatency)
|
||||||
|
fmt.Printf(" Avg: %v\n", avg)
|
||||||
|
}
|
||||||
|
fmt.Println("========================")
|
||||||
|
}
|
||||||
305
signal/loadtest/rate_loadtest_test.go
Normal file
305
signal/loadtest/rate_loadtest_test.go
Normal file
@@ -0,0 +1,305 @@
|
|||||||
|
package loadtest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/shared/signal/proto"
|
||||||
|
"github.com/netbirdio/netbird/signal/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestLoadTest_10PairsPerSecond(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping load test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
||||||
|
defer grpcServer.Stop()
|
||||||
|
|
||||||
|
config := LoadTestConfig{
|
||||||
|
ServerURL: serverAddr,
|
||||||
|
PairsPerSecond: 10,
|
||||||
|
TotalPairs: 50,
|
||||||
|
MessageSize: 100,
|
||||||
|
TestDuration: 30 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
loadTest := NewLoadTest(config)
|
||||||
|
err := loadTest.Run()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
metrics := loadTest.GetMetrics()
|
||||||
|
metrics.PrintReport()
|
||||||
|
|
||||||
|
require.Equal(t, int64(50), metrics.TotalPairsSent.Load(), "Should send all 50 pairs")
|
||||||
|
require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(0), "Should have successful exchanges")
|
||||||
|
require.Equal(t, metrics.TotalMessagesExchanged.Load(), metrics.SuccessfulExchanges.Load(), "Messages exchanged should match successful exchanges")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadTest_20PairsPerSecond(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping load test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
||||||
|
defer grpcServer.Stop()
|
||||||
|
|
||||||
|
config := LoadTestConfig{
|
||||||
|
ServerURL: serverAddr,
|
||||||
|
PairsPerSecond: 20,
|
||||||
|
TotalPairs: 100,
|
||||||
|
MessageSize: 500,
|
||||||
|
TestDuration: 30 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
loadTest := NewLoadTest(config)
|
||||||
|
err := loadTest.Run()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
metrics := loadTest.GetMetrics()
|
||||||
|
metrics.PrintReport()
|
||||||
|
|
||||||
|
require.Equal(t, int64(100), metrics.TotalPairsSent.Load(), "Should send all 100 pairs")
|
||||||
|
require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(0), "Should have successful exchanges")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadTest_SmallBurst(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
||||||
|
defer grpcServer.Stop()
|
||||||
|
|
||||||
|
config := LoadTestConfig{
|
||||||
|
ServerURL: serverAddr,
|
||||||
|
PairsPerSecond: 5,
|
||||||
|
TotalPairs: 10,
|
||||||
|
MessageSize: 50,
|
||||||
|
TestDuration: 10 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
loadTest := NewLoadTest(config)
|
||||||
|
err := loadTest.Run()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
metrics := loadTest.GetMetrics()
|
||||||
|
metrics.PrintReport()
|
||||||
|
|
||||||
|
require.Equal(t, int64(10), metrics.TotalPairsSent.Load())
|
||||||
|
require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(5), "At least 50% success rate")
|
||||||
|
require.Less(t, metrics.FailedExchanges.Load(), int64(5), "Less than 50% failure rate")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadTest_ContinuousExchange_30Seconds(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping continuous exchange test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
||||||
|
defer grpcServer.Stop()
|
||||||
|
|
||||||
|
config := LoadTestConfig{
|
||||||
|
ServerURL: serverAddr,
|
||||||
|
PairsPerSecond: 5,
|
||||||
|
TotalPairs: 10,
|
||||||
|
MessageSize: 100,
|
||||||
|
ExchangeDuration: 30 * time.Second,
|
||||||
|
MessageInterval: 100 * time.Millisecond,
|
||||||
|
TestDuration: 2 * time.Minute,
|
||||||
|
}
|
||||||
|
|
||||||
|
loadTest := NewLoadTest(config)
|
||||||
|
err := loadTest.Run()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
metrics := loadTest.GetMetrics()
|
||||||
|
metrics.PrintReport()
|
||||||
|
|
||||||
|
require.Equal(t, int64(10), metrics.TotalPairsSent.Load())
|
||||||
|
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(2000), "Should exchange many messages over 30 seconds")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadTest_ContinuousExchange_10Minutes(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("Skipping long continuous exchange test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
||||||
|
defer grpcServer.Stop()
|
||||||
|
|
||||||
|
config := LoadTestConfig{
|
||||||
|
ServerURL: serverAddr,
|
||||||
|
PairsPerSecond: 10,
|
||||||
|
TotalPairs: 20,
|
||||||
|
MessageSize: 200,
|
||||||
|
ExchangeDuration: 10 * time.Minute,
|
||||||
|
MessageInterval: 200 * time.Millisecond,
|
||||||
|
TestDuration: 15 * time.Minute,
|
||||||
|
}
|
||||||
|
|
||||||
|
loadTest := NewLoadTest(config)
|
||||||
|
err := loadTest.Run()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
metrics := loadTest.GetMetrics()
|
||||||
|
metrics.PrintReport()
|
||||||
|
|
||||||
|
require.Equal(t, int64(20), metrics.TotalPairsSent.Load())
|
||||||
|
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(50000), "Should exchange many messages over 10 minutes")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadTest_ContinuousExchange_ShortBurst(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
||||||
|
defer grpcServer.Stop()
|
||||||
|
|
||||||
|
config := LoadTestConfig{
|
||||||
|
ServerURL: serverAddr,
|
||||||
|
PairsPerSecond: 3,
|
||||||
|
TotalPairs: 5,
|
||||||
|
MessageSize: 50,
|
||||||
|
ExchangeDuration: 3 * time.Second,
|
||||||
|
MessageInterval: 100 * time.Millisecond,
|
||||||
|
TestDuration: 10 * time.Second,
|
||||||
|
ReportInterval: 50, // Report every 50 messages for testing
|
||||||
|
}
|
||||||
|
|
||||||
|
loadTest := NewLoadTest(config)
|
||||||
|
err := loadTest.Run()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
metrics := loadTest.GetMetrics()
|
||||||
|
metrics.PrintReport()
|
||||||
|
|
||||||
|
require.Equal(t, int64(5), metrics.TotalPairsSent.Load())
|
||||||
|
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(100), "Should exchange multiple messages in 3 seconds")
|
||||||
|
require.Equal(t, int64(5), metrics.SuccessfulExchanges.Load(), "All pairs should complete successfully")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestLoadTest_ReconnectionConfig(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
||||||
|
defer grpcServer.Stop()
|
||||||
|
|
||||||
|
config := LoadTestConfig{
|
||||||
|
ServerURL: serverAddr,
|
||||||
|
PairsPerSecond: 3,
|
||||||
|
TotalPairs: 5,
|
||||||
|
MessageSize: 50,
|
||||||
|
ExchangeDuration: 2 * time.Second,
|
||||||
|
MessageInterval: 200 * time.Millisecond,
|
||||||
|
TestDuration: 5 * time.Second,
|
||||||
|
EnableReconnect: true,
|
||||||
|
InitialRetryDelay: 100 * time.Millisecond,
|
||||||
|
MaxReconnectDelay: 2 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
loadTest := NewLoadTest(config)
|
||||||
|
err := loadTest.Run()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
metrics := loadTest.GetMetrics()
|
||||||
|
metrics.PrintReport()
|
||||||
|
|
||||||
|
// Test should complete successfully with reconnection enabled
|
||||||
|
require.Equal(t, int64(5), metrics.TotalPairsSent.Load())
|
||||||
|
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(0), "Should have exchanged messages")
|
||||||
|
require.Equal(t, int64(5), metrics.SuccessfulExchanges.Load(), "All pairs should complete successfully")
|
||||||
|
|
||||||
|
// Reconnections counter should exist (even if zero for this stable test)
|
||||||
|
reconnections := metrics.TotalReconnections.Load()
|
||||||
|
require.GreaterOrEqual(t, reconnections, int64(0), "Reconnections metric should be tracked")
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkLoadTest_Throughput(b *testing.B) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
grpcServer, serverAddr := startBenchSignalServer(b, ctx)
|
||||||
|
defer grpcServer.Stop()
|
||||||
|
|
||||||
|
b.Run("5pairs-per-sec", func(b *testing.B) {
|
||||||
|
config := LoadTestConfig{
|
||||||
|
ServerURL: serverAddr,
|
||||||
|
PairsPerSecond: 5,
|
||||||
|
TotalPairs: b.N,
|
||||||
|
MessageSize: 100,
|
||||||
|
}
|
||||||
|
|
||||||
|
loadTest := NewLoadTest(config)
|
||||||
|
b.ResetTimer()
|
||||||
|
_ = loadTest.Run()
|
||||||
|
b.StopTimer()
|
||||||
|
|
||||||
|
metrics := loadTest.GetMetrics()
|
||||||
|
b.ReportMetric(float64(metrics.SuccessfulExchanges.Load()), "successful")
|
||||||
|
b.ReportMetric(float64(metrics.FailedExchanges.Load()), "failed")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func startTestSignalServerForLoad(t *testing.T, ctx context.Context) (*grpc.Server, string) {
|
||||||
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
grpcServer := grpc.NewServer()
|
||||||
|
|
||||||
|
signalServer, err := server.NewServer(ctx, otel.Meter("test"))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
proto.RegisterSignalExchangeServer(grpcServer, signalServer)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := grpcServer.Serve(listener); err != nil {
|
||||||
|
t.Logf("Server stopped: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
return grpcServer, fmt.Sprintf("http://%s", listener.Addr().String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func startBenchSignalServer(b *testing.B, ctx context.Context) (*grpc.Server, string) {
|
||||||
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
grpcServer := grpc.NewServer()
|
||||||
|
|
||||||
|
signalServer, err := server.NewServer(ctx, otel.Meter("bench"))
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
proto.RegisterSignalExchangeServer(grpcServer, signalServer)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := grpcServer.Serve(listener); err != nil {
|
||||||
|
b.Logf("Server stopped: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
return grpcServer, fmt.Sprintf("http://%s", listener.Addr().String())
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user