with retry

This commit is contained in:
Maycon Santos
2025-10-12 15:54:39 +02:00
parent f9a71e98c7
commit 4787e28ae3
5 changed files with 261 additions and 13 deletions

View File

@@ -9,9 +9,10 @@ Load testing tool for the NetBird signal server.
- **Single message**: Each pair exchanges one message for validation
- **Continuous exchange**: Pairs continuously exchange messages for a specified duration (e.g., 30 seconds, 10 minutes)
- **TLS/HTTPS support**: Connect to TLS-enabled signal servers with optional certificate verification
- **Automatic reconnection**: Optional automatic reconnection with exponential backoff on connection loss
- **Configurable message interval**: Control message send rate in continuous mode
- **Message exchange validation**: Validates encrypted body size > 0
- **Comprehensive metrics**: Tracks throughput, success/failure rates, and latency statistics
- **Comprehensive metrics**: Tracks throughput, success/failure rates, latency statistics, and reconnection counts
- **Local server testing**: Tests include embedded signal server for easy development
- **Worker pool pattern**: Efficient concurrent execution
- **Graceful shutdown**: Context-based cancellation
@@ -88,6 +89,17 @@ go build -o signal-loadtest
-report-interval 5000 \
-log-level info
# With automatic reconnection
./signal-loadtest \
-server http://localhost:10000 \
-pairs-per-sec 10 \
-total-pairs 50 \
-exchange-duration 5m \
-enable-reconnect \
-initial-retry-delay 100ms \
-max-reconnect-delay 30s \
-log-level debug
# Show help
./signal-loadtest -h
```
@@ -111,6 +123,9 @@ The load test supports graceful shutdown via Ctrl+C (SIGINT/SIGTERM):
- `-worker-pool-size`: Number of concurrent workers, 0 = auto (pairs-per-sec × 2) (default: 0)
- `-channel-buffer-size`: Work queue buffer size, 0 = auto (pairs-per-sec × 4) (default: 0)
- `-report-interval`: Report progress every N messages, 0 = no periodic reports (default: 10000)
- `-enable-reconnect`: Enable automatic reconnection on connection loss (default: false)
- `-initial-retry-delay`: Initial delay before first reconnection attempt (default: 100ms)
- `-max-reconnect-delay`: Maximum delay between reconnection attempts (default: 30s)
- `-insecure-skip-verify`: Skip TLS certificate verification for self-signed certificates (default: false)
- `-log-level`: Log level: trace, debug, info, warn, error (default: info)
@@ -206,9 +221,41 @@ func main() {
- **WorkerPoolSize**: Number of concurrent worker goroutines (0 = auto: pairs-per-sec × 2)
- **ChannelBufferSize**: Work queue buffer size (0 = auto: pairs-per-sec × 4)
- **ReportInterval**: Report progress every N messages (0 = no periodic reports, default: 10000)
- **EnableReconnect**: Enable automatic reconnection on connection loss (default: false)
- **InitialRetryDelay**: Initial delay before first reconnection attempt (default: 100ms)
- **MaxReconnectDelay**: Maximum delay between reconnection attempts (default: 30s)
- **InsecureSkipVerify**: Skip TLS certificate verification (for self-signed certificates)
- **RampUpDuration**: Gradual ramp-up period (not yet implemented)
### Reconnection Handling
The load test supports automatic reconnection on connection loss:
- **Disabled by default**: Connections will fail on any network interruption
- **When enabled**: Clients automatically reconnect with exponential backoff
- **Exponential backoff**: Starts at `InitialRetryDelay`, doubles on each failure, caps at `MaxReconnectDelay`
- **Transparent reconnection**: Message exchange continues after successful reconnection
- **Metrics tracking**: Total reconnection count is reported
**Use cases:**
- Testing resilience to network interruptions
- Validating server restart behavior
- Simulating flaky network conditions
- Long-running stability tests
**Example with reconnection:**
```go
config := loadtest.LoadTestConfig{
ServerURL: "http://localhost:10000",
PairsPerSecond: 10,
TotalPairs: 20,
ExchangeDuration: 10 * time.Minute,
EnableReconnect: true,
InitialRetryDelay: 100 * time.Millisecond,
MaxReconnectDelay: 30 * time.Second,
}
```
### Performance Tuning
When running high-load tests, you may need to adjust the worker pool and buffer sizes:
@@ -238,6 +285,7 @@ The load test collects and reports:
- **Failed Exchanges**: Failed message exchanges
- **Total Messages Exchanged**: Count of successfully exchanged messages
- **Total Errors**: Cumulative error count
- **Total Reconnections**: Number of automatic reconnections (if enabled)
- **Throughput**: Pairs per second (actual)
- **Latency Statistics**: Min, Max, Avg message exchange latency
@@ -323,7 +371,8 @@ Latency Statistics:
## Future Enhancements
- [ ] TLS/HTTPS support for production servers
- [x] TLS/HTTPS support for production servers
- [x] Automatic reconnection with exponential backoff
- [ ] Ramp-up period implementation
- [ ] Percentile latency metrics (p50, p95, p99)
- [ ] Connection reuse for multiple messages per pair

View File

@@ -5,8 +5,10 @@ import (
"crypto/tls"
"fmt"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
@@ -18,17 +20,27 @@ import (
// Client represents a signal client for load testing
type Client struct {
id string
serverURL string
config *ClientConfig
conn *grpc.ClientConn
client proto.SignalExchangeClient
stream proto.SignalExchange_ConnectStreamClient
ctx context.Context
cancel context.CancelFunc
msgChannel chan *proto.EncryptedMessage
mu sync.RWMutex
reconnectCount int64
connected bool
receiverStarted bool
}
// ClientConfig holds optional configuration for the client
type ClientConfig struct {
InsecureSkipVerify bool
EnableReconnect bool
MaxReconnectDelay time.Duration
InitialRetryDelay time.Duration
}
// NewClient creates a new signal client for load testing
@@ -42,6 +54,16 @@ func NewClientWithConfig(serverURL, peerID string, config *ClientConfig) (*Clien
config = &ClientConfig{}
}
// Set default reconnect delays if not specified
if config.EnableReconnect {
if config.InitialRetryDelay == 0 {
config.InitialRetryDelay = 100 * time.Millisecond
}
if config.MaxReconnectDelay == 0 {
config.MaxReconnectDelay = 30 * time.Second
}
}
addr, opts, err := parseServerURL(serverURL, config.InsecureSkipVerify)
if err != nil {
return nil, fmt.Errorf("parse server URL: %w", err)
@@ -57,11 +79,14 @@ func NewClientWithConfig(serverURL, peerID string, config *ClientConfig) (*Clien
return &Client{
id: peerID,
serverURL: serverURL,
config: config,
conn: conn,
client: client,
ctx: ctx,
cancel: cancel,
msgChannel: make(chan *proto.EncryptedMessage, 10),
connected: false,
}, nil
}
@@ -79,13 +104,72 @@ func (c *Client) Connect() error {
return fmt.Errorf("receive header: %w", err)
}
c.mu.Lock()
c.stream = stream
go c.receiveMessages()
c.connected = true
if !c.receiverStarted {
c.receiverStarted = true
c.mu.Unlock()
go c.receiveMessages()
} else {
c.mu.Unlock()
}
return nil
}
// reconnectStream reconnects the stream without starting a new receiver goroutine
func (c *Client) reconnectStream() error {
if !c.config.EnableReconnect {
return fmt.Errorf("reconnect disabled")
}
delay := c.config.InitialRetryDelay
attempt := 0
for {
select {
case <-c.ctx.Done():
return c.ctx.Err()
case <-time.After(delay):
attempt++
log.Debugf("Client %s reconnect attempt %d (delay: %v)", c.id, attempt, delay)
md := metadata.New(map[string]string{proto.HeaderId: c.id})
ctx := metadata.NewOutgoingContext(c.ctx, md)
stream, err := c.client.ConnectStream(ctx)
if err != nil {
log.Debugf("Client %s reconnect attempt %d failed: %v", c.id, attempt, err)
delay *= 2
if delay > c.config.MaxReconnectDelay {
delay = c.config.MaxReconnectDelay
}
continue
}
if _, err := stream.Header(); err != nil {
log.Debugf("Client %s reconnect header failed: %v", c.id, err)
delay *= 2
if delay > c.config.MaxReconnectDelay {
delay = c.config.MaxReconnectDelay
}
continue
}
c.mu.Lock()
c.stream = stream
c.connected = true
c.reconnectCount++
c.mu.Unlock()
log.Debugf("Client %s reconnected successfully (attempt %d, total reconnects: %d)",
c.id, attempt, c.reconnectCount)
return nil
}
}
}
// SendMessage sends an encrypted message to a remote peer using the Send RPC
func (c *Client) SendMessage(remotePeerID string, body []byte) error {
msg := &proto.EncryptedMessage{
@@ -94,7 +178,7 @@ func (c *Client) SendMessage(remotePeerID string, body []byte) error {
Body: body,
}
ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second)
defer cancel()
_, err := c.client.Send(ctx, msg)
@@ -125,10 +209,41 @@ func (c *Client) Close() error {
}
func (c *Client) receiveMessages() {
defer close(c.msgChannel)
for {
msg, err := c.stream.Recv()
c.mu.RLock()
stream := c.stream
c.mu.RUnlock()
if stream == nil {
return
}
msg, err := stream.Recv()
if err != nil {
// Check if context is cancelled before attempting reconnection
select {
case <-c.ctx.Done():
return
default:
}
c.mu.Lock()
c.connected = false
c.mu.Unlock()
log.Debugf("Client %s receive error: %v", c.id, err)
// Attempt reconnection if enabled
if c.config.EnableReconnect {
if reconnectErr := c.reconnectStream(); reconnectErr != nil {
log.Debugf("Client %s reconnection failed: %v", c.id, reconnectErr)
return
}
// Successfully reconnected, continue receiving
continue
}
// Reconnect disabled, exit
return
}
@@ -140,6 +255,20 @@ func (c *Client) receiveMessages() {
}
}
// IsConnected returns whether the client is currently connected
func (c *Client) IsConnected() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.connected
}
// GetReconnectCount returns the number of reconnections
func (c *Client) GetReconnectCount() int64 {
c.mu.RLock()
defer c.mu.RUnlock()
return c.reconnectCount
}
func parseServerURL(serverURL string, insecureSkipVerify bool) (string, []grpc.DialOption, error) {
serverURL = strings.TrimSpace(serverURL)
if serverURL == "" {

View File

@@ -27,6 +27,9 @@ var (
channelBufferSize int
reportInterval int
logLevel string
enableReconnect bool
maxReconnectDelay time.Duration
initialRetryDelay time.Duration
)
func init() {
@@ -42,6 +45,9 @@ func init() {
flag.IntVar(&channelBufferSize, "channel-buffer-size", 0, "Channel buffer size (0 = auto: pairs-per-sec * 4)")
flag.IntVar(&reportInterval, "report-interval", 10000, "Report progress every N messages (0 = no periodic reports)")
flag.StringVar(&logLevel, "log-level", "info", "Log level (trace, debug, info, warn, error)")
flag.BoolVar(&enableReconnect, "enable-reconnect", true, "Enable automatic reconnection on connection loss")
flag.DurationVar(&maxReconnectDelay, "max-reconnect-delay", 30*time.Second, "Maximum delay between reconnection attempts")
flag.DurationVar(&initialRetryDelay, "initial-retry-delay", 100*time.Millisecond, "Initial delay before first reconnection attempt")
}
func main() {
@@ -66,6 +72,9 @@ func main() {
WorkerPoolSize: workerPoolSize,
ChannelBufferSize: channelBufferSize,
ReportInterval: reportInterval,
EnableReconnect: enableReconnect,
MaxReconnectDelay: maxReconnectDelay,
InitialRetryDelay: initialRetryDelay,
}
if err := validateConfig(config); err != nil {
@@ -91,6 +100,11 @@ func main() {
} else {
log.Infof(" Mode: Single message exchange")
}
if config.EnableReconnect {
log.Infof(" Reconnection: ENABLED")
log.Infof(" Initial retry delay: %v", config.InitialRetryDelay)
log.Infof(" Max reconnect delay: %v", config.MaxReconnectDelay)
}
fmt.Println()
// Set up signal handler for graceful shutdown

View File

@@ -25,6 +25,9 @@ type LoadTestConfig struct {
WorkerPoolSize int
ChannelBufferSize int
ReportInterval int // Report progress every N messages (0 = no periodic reports)
EnableReconnect bool
MaxReconnectDelay time.Duration
InitialRetryDelay time.Duration
}
// LoadTestMetrics metrics collected during the load test
@@ -35,6 +38,7 @@ type LoadTestMetrics struct {
SuccessfulExchanges atomic.Int64
FailedExchanges atomic.Int64
ActivePairs atomic.Int64
TotalReconnections atomic.Int64
mu sync.Mutex
latencies []time.Duration
@@ -182,24 +186,33 @@ func (lt *LoadTest) pairWorker(wg *sync.WaitGroup, pairChan <-chan int) {
}
func (lt *LoadTest) executePairExchange(pairID int) error {
senderID := fmt.Sprintf("%s-%d", lt.config.IDPrefix, pairID)
receiverID := fmt.Sprintf("%s-%d", lt.config.IDPrefix, pairID)
senderID := fmt.Sprintf("%ssender-%d", lt.config.IDPrefix, pairID)
receiverID := fmt.Sprintf("%sreceiver-%d", lt.config.IDPrefix, pairID)
clientConfig := &ClientConfig{
InsecureSkipVerify: lt.config.InsecureSkipVerify,
EnableReconnect: lt.config.EnableReconnect,
MaxReconnectDelay: lt.config.MaxReconnectDelay,
InitialRetryDelay: lt.config.InitialRetryDelay,
}
sender, err := NewClientWithConfig(lt.config.ServerURL, senderID, clientConfig)
if err != nil {
return fmt.Errorf("create sender: %w", err)
}
defer sender.Close()
defer func() {
sender.Close()
lt.metrics.TotalReconnections.Add(sender.GetReconnectCount())
}()
receiver, err := NewClientWithConfig(lt.config.ServerURL, receiverID, clientConfig)
if err != nil {
return fmt.Errorf("create receiver: %w", err)
}
defer receiver.Close()
defer func() {
receiver.Close()
lt.metrics.TotalReconnections.Add(receiver.GetReconnectCount())
}()
if err := sender.Connect(); err != nil {
return fmt.Errorf("sender connect: %w", err)
@@ -370,14 +383,15 @@ func (lt *LoadTest) progressReporter(wg *sync.WaitGroup, interval int) {
elapsed := time.Since(lt.metrics.startTime)
activePairs := lt.metrics.ActivePairs.Load()
errors := lt.metrics.TotalErrors.Load()
reconnections := lt.metrics.TotalReconnections.Load()
var msgRate float64
if elapsed.Seconds() > 0 {
msgRate = float64(currentMessages) / elapsed.Seconds()
}
log.Infof("Progress: %d messages exchanged, %d active pairs, %d errors, %.2f msg/sec, elapsed: %v",
currentMessages, activePairs, errors, msgRate, elapsed.Round(time.Second))
log.Infof("Progress: %d messages exchanged, %d active pairs, %d errors, %d reconnections, %.2f msg/sec, elapsed: %v",
currentMessages, activePairs, errors, reconnections, msgRate, elapsed.Round(time.Second))
lastReported = (currentMessages / int64(interval)) * int64(interval)
}
@@ -408,6 +422,11 @@ func (m *LoadTestMetrics) PrintReport() {
fmt.Printf("Total Messages Exchanged: %d\n", m.TotalMessagesExchanged.Load())
fmt.Printf("Total Errors: %d\n", m.TotalErrors.Load())
reconnections := m.TotalReconnections.Load()
if reconnections > 0 {
fmt.Printf("Total Reconnections: %d\n", reconnections)
}
if duration.Seconds() > 0 {
throughput := float64(m.SuccessfulExchanges.Load()) / duration.Seconds()
fmt.Printf("Throughput: %.2f pairs/sec\n", throughput)

View File

@@ -197,6 +197,43 @@ func TestLoadTest_ContinuousExchange_ShortBurst(t *testing.T) {
require.Equal(t, int64(5), metrics.SuccessfulExchanges.Load(), "All pairs should complete successfully")
}
func TestLoadTest_ReconnectionConfig(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
defer grpcServer.Stop()
config := LoadTestConfig{
ServerURL: serverAddr,
PairsPerSecond: 3,
TotalPairs: 5,
MessageSize: 50,
ExchangeDuration: 2 * time.Second,
MessageInterval: 200 * time.Millisecond,
TestDuration: 5 * time.Second,
EnableReconnect: true,
InitialRetryDelay: 100 * time.Millisecond,
MaxReconnectDelay: 2 * time.Second,
}
loadTest := NewLoadTest(config)
err := loadTest.Run()
require.NoError(t, err)
metrics := loadTest.GetMetrics()
metrics.PrintReport()
// Test should complete successfully with reconnection enabled
require.Equal(t, int64(5), metrics.TotalPairsSent.Load())
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(0), "Should have exchanged messages")
require.Equal(t, int64(5), metrics.SuccessfulExchanges.Load(), "All pairs should complete successfully")
// Reconnections counter should exist (even if zero for this stable test)
reconnections := metrics.TotalReconnections.Load()
require.GreaterOrEqual(t, reconnections, int64(0), "Reconnections metric should be tracked")
}
func BenchmarkLoadTest_Throughput(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()