add test for time

This commit is contained in:
Maycon Santos
2025-10-10 00:26:44 +02:00
parent d1153b5b5d
commit 2288664fe7
4 changed files with 258 additions and 21 deletions

View File

@@ -5,9 +5,15 @@ Load testing tool for the NetBird signal server.
## Features
- **Rate-based peer pair creation**: Spawn peer pairs at configurable rates (e.g., 10, 20 pairs/sec)
- **Message exchange validation**: Each pair exchanges one message and validates encrypted body size > 0
- **Two exchange modes**:
- **Single message**: Each pair exchanges one message for validation
- **Continuous exchange**: Pairs continuously exchange messages for a specified duration (e.g., 30 seconds, 10 minutes)
- **Configurable message interval**: Control message send rate in continuous mode
- **Message exchange validation**: Validates encrypted body size > 0
- **Comprehensive metrics**: Tracks throughput, success/failure rates, and latency statistics
- **Local server testing**: Tests include embedded signal server for easy development
- **Worker pool pattern**: Efficient concurrent execution
- **Graceful shutdown**: Context-based cancellation
## Usage
@@ -15,19 +21,25 @@ Load testing tool for the NetBird signal server.
```bash
# Run all tests (includes load tests)
go test -v -timeout 60s
go test -v -timeout 2m
# Run specific load test
# Run specific single-message load tests
go test -v -run TestLoadTest_10PairsPerSecond -timeout 40s
go test -v -run TestLoadTest_20PairsPerSecond -timeout 40s
go test -v -run TestLoadTest_SmallBurst -timeout 30s
# Skip load tests in quick runs
# Run continuous exchange tests
go test -v -run TestLoadTest_ContinuousExchange_ShortBurst -timeout 30s
go test -v -run TestLoadTest_ContinuousExchange_30Seconds -timeout 2m
go test -v -run TestLoadTest_ContinuousExchange_10Minutes -timeout 15m
# Skip long-running tests in quick runs
go test -short
```
### Programmatic Usage
#### Single Message Exchange
```go
package main
@@ -55,6 +67,36 @@ func main() {
}
```
#### Continuous Message Exchange
```go
package main
import (
"github.com/netbirdio/netbird/signal/loadtest"
"time"
)
func main() {
config := loadtest.LoadTestConfig{
ServerURL: "http://localhost:10000",
PairsPerSecond: 10,
TotalPairs: 20,
MessageSize: 200,
ExchangeDuration: 10 * time.Minute, // Each pair exchanges messages for 10 minutes
MessageInterval: 200 * time.Millisecond, // Send message every 200ms
TestDuration: 15 * time.Minute, // Overall test timeout
}
lt := loadtest.NewLoadTest(config)
if err := lt.Run(); err != nil {
panic(err)
}
metrics := lt.GetMetrics()
metrics.PrintReport()
}
```
## Configuration Options
- **ServerURL**: Signal server URL (e.g., `http://localhost:10000` or `https://signal.example.com:443`)
@@ -62,6 +104,8 @@ func main() {
- **TotalPairs**: Total number of peer pairs to create
- **MessageSize**: Size of test message payload in bytes
- **TestDuration**: Maximum test duration (optional, 0 = no limit)
- **ExchangeDuration**: Duration for continuous message exchange per pair (0 = single message)
- **MessageInterval**: Interval between messages in continuous mode (default: 100ms)
- **RampUpDuration**: Gradual ramp-up period (not yet implemented)
## Metrics

View File

@@ -102,7 +102,6 @@ func (c *Client) ReceiveMessage() (*proto.EncryptedMessage, error) {
// Close closes the client connection
func (c *Client) Close() error {
c.cancel()
close(c.msgChannel)
if c.conn != nil {
return c.conn.Close()
}
@@ -110,6 +109,7 @@ func (c *Client) Close() error {
}
func (c *Client) receiveMessages() {
defer close(c.msgChannel)
for {
msg, err := c.stream.Recv()
if err != nil {

View File

@@ -12,12 +12,14 @@ import (
// LoadTestConfig configuration for the load test
type LoadTestConfig struct {
ServerURL string
PairsPerSecond int
TotalPairs int
MessageSize int
TestDuration time.Duration
RampUpDuration time.Duration
ServerURL string
PairsPerSecond int
TotalPairs int
MessageSize int
TestDuration time.Duration
ExchangeDuration time.Duration
MessageInterval time.Duration
RampUpDuration time.Duration
}
// LoadTestMetrics metrics collected during the load test
@@ -67,8 +69,12 @@ func (lt *LoadTest) Run() error {
lt.metrics.endTime = time.Now()
}()
log.Infof("Starting load test: %d pairs/sec, %d total pairs, message size: %d bytes",
lt.config.PairsPerSecond, lt.config.TotalPairs, lt.config.MessageSize)
exchangeInfo := "single message"
if lt.config.ExchangeDuration > 0 {
exchangeInfo = fmt.Sprintf("continuous for %v", lt.config.ExchangeDuration)
}
log.Infof("Starting load test: %d pairs/sec, %d total pairs, message size: %d bytes, exchange: %s",
lt.config.PairsPerSecond, lt.config.TotalPairs, lt.config.MessageSize, exchangeInfo)
var wg sync.WaitGroup
pairChan := make(chan int, lt.config.PairsPerSecond)
@@ -159,6 +165,14 @@ func (lt *LoadTest) executePairExchange(pairID int) error {
testMessage[i] = byte(i % 256)
}
if lt.config.ExchangeDuration > 0 {
return lt.continuousExchange(pairID, sender, receiver, receiverID, testMessage)
}
return lt.singleExchange(sender, receiver, receiverID, testMessage)
}
func (lt *LoadTest) singleExchange(sender, receiver *Client, receiverID string, testMessage []byte) error {
startTime := time.Now()
if err := sender.SendMessage(receiverID, testMessage); err != nil {
@@ -195,6 +209,92 @@ func (lt *LoadTest) executePairExchange(pairID int) error {
}
}
func (lt *LoadTest) continuousExchange(pairID int, sender, receiver *Client, receiverID string, testMessage []byte) error {
exchangeCtx, cancel := context.WithTimeout(lt.ctx, lt.config.ExchangeDuration)
defer cancel()
messageInterval := lt.config.MessageInterval
if messageInterval == 0 {
messageInterval = 100 * time.Millisecond
}
errChan := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := lt.receiverLoop(exchangeCtx, receiver, pairID); err != nil && err != context.DeadlineExceeded && err != context.Canceled {
select {
case errChan <- err:
default:
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
if err := lt.senderLoop(exchangeCtx, sender, receiverID, testMessage, messageInterval); err != nil && err != context.DeadlineExceeded && err != context.Canceled {
select {
case errChan <- err:
default:
}
}
}()
wg.Wait()
select {
case err := <-errChan:
return err
default:
return nil
}
}
func (lt *LoadTest) senderLoop(ctx context.Context, sender *Client, receiverID string, message []byte, interval time.Duration) error {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
startTime := time.Now()
if err := sender.SendMessage(receiverID, message); err != nil {
lt.metrics.TotalErrors.Add(1)
log.Debugf("Send error: %v", err)
continue
}
lt.recordLatency(time.Since(startTime))
}
}
}
func (lt *LoadTest) receiverLoop(ctx context.Context, receiver *Client, pairID int) error {
for {
if ctx.Err() != nil {
return ctx.Err()
}
select {
case msg, ok := <-receiver.msgChannel:
if !ok {
return nil
}
if len(msg.Body) > 0 {
lt.metrics.TotalMessagesExchanged.Add(1)
}
case <-ctx.Done():
return ctx.Err()
case <-time.After(100 * time.Millisecond):
continue
}
}
}
func (lt *LoadTest) recordLatency(latency time.Duration) {
lt.metrics.mu.Lock()
defer lt.metrics.mu.Unlock()
@@ -234,23 +334,23 @@ func (m *LoadTestMetrics) PrintReport() {
if len(latencies) > 0 {
var total time.Duration
min := latencies[0]
max := latencies[0]
minLatency := latencies[0]
maxLatency := latencies[0]
for _, lat := range latencies {
total += lat
if lat < min {
min = lat
if lat < minLatency {
minLatency = lat
}
if lat > max {
max = lat
if lat > maxLatency {
maxLatency = lat
}
}
avg := total / time.Duration(len(latencies))
fmt.Printf("\nLatency Statistics:\n")
fmt.Printf(" Min: %v\n", min)
fmt.Printf(" Max: %v\n", max)
fmt.Printf(" Min: %v\n", minLatency)
fmt.Printf(" Max: %v\n", maxLatency)
fmt.Printf(" Avg: %v\n", avg)
}
fmt.Println("========================")

View File

@@ -103,6 +103,99 @@ func TestLoadTest_SmallBurst(t *testing.T) {
require.Less(t, metrics.FailedExchanges.Load(), int64(5), "Less than 50% failure rate")
}
func TestLoadTest_ContinuousExchange_30Seconds(t *testing.T) {
if testing.Short() {
t.Skip("Skipping continuous exchange test in short mode")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
defer grpcServer.Stop()
config := LoadTestConfig{
ServerURL: serverAddr,
PairsPerSecond: 5,
TotalPairs: 10,
MessageSize: 100,
ExchangeDuration: 30 * time.Second,
MessageInterval: 100 * time.Millisecond,
TestDuration: 2 * time.Minute,
}
loadTest := NewLoadTest(config)
err := loadTest.Run()
require.NoError(t, err)
metrics := loadTest.GetMetrics()
metrics.PrintReport()
require.Equal(t, int64(10), metrics.TotalPairsSent.Load())
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(2000), "Should exchange many messages over 30 seconds")
}
func TestLoadTest_ContinuousExchange_10Minutes(t *testing.T) {
if testing.Short() {
t.Skip("Skipping long continuous exchange test in short mode")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
defer grpcServer.Stop()
config := LoadTestConfig{
ServerURL: serverAddr,
PairsPerSecond: 10,
TotalPairs: 20,
MessageSize: 200,
ExchangeDuration: 10 * time.Minute,
MessageInterval: 200 * time.Millisecond,
TestDuration: 15 * time.Minute,
}
loadTest := NewLoadTest(config)
err := loadTest.Run()
require.NoError(t, err)
metrics := loadTest.GetMetrics()
metrics.PrintReport()
require.Equal(t, int64(20), metrics.TotalPairsSent.Load())
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(50000), "Should exchange many messages over 10 minutes")
}
func TestLoadTest_ContinuousExchange_ShortBurst(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
defer grpcServer.Stop()
config := LoadTestConfig{
ServerURL: serverAddr,
PairsPerSecond: 3,
TotalPairs: 5,
MessageSize: 50,
ExchangeDuration: 3 * time.Second,
MessageInterval: 100 * time.Millisecond,
TestDuration: 10 * time.Second,
}
loadTest := NewLoadTest(config)
err := loadTest.Run()
require.NoError(t, err)
metrics := loadTest.GetMetrics()
metrics.PrintReport()
require.Equal(t, int64(5), metrics.TotalPairsSent.Load())
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(100), "Should exchange multiple messages in 3 seconds")
require.Equal(t, int64(5), metrics.SuccessfulExchanges.Load(), "All pairs should complete successfully")
}
func BenchmarkLoadTest_Throughput(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()