From 4787e28ae3c687b5bb40ab71271868fd210d246a Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Sun, 12 Oct 2025 15:54:39 +0200 Subject: [PATCH] with retry --- signal/loadtest/README.md | 53 +++++++- signal/loadtest/client.go | 139 +++++++++++++++++++- signal/loadtest/cmd/signal-loadtest/main.go | 14 ++ signal/loadtest/rate_loadtest.go | 31 ++++- signal/loadtest/rate_loadtest_test.go | 37 ++++++ 5 files changed, 261 insertions(+), 13 deletions(-) diff --git a/signal/loadtest/README.md b/signal/loadtest/README.md index 0b2448f41..4c94a4abb 100644 --- a/signal/loadtest/README.md +++ b/signal/loadtest/README.md @@ -9,9 +9,10 @@ Load testing tool for the NetBird signal server. - **Single message**: Each pair exchanges one message for validation - **Continuous exchange**: Pairs continuously exchange messages for a specified duration (e.g., 30 seconds, 10 minutes) - **TLS/HTTPS support**: Connect to TLS-enabled signal servers with optional certificate verification +- **Automatic reconnection**: Optional automatic reconnection with exponential backoff on connection loss - **Configurable message interval**: Control message send rate in continuous mode - **Message exchange validation**: Validates encrypted body size > 0 -- **Comprehensive metrics**: Tracks throughput, success/failure rates, and latency statistics +- **Comprehensive metrics**: Tracks throughput, success/failure rates, latency statistics, and reconnection counts - **Local server testing**: Tests include embedded signal server for easy development - **Worker pool pattern**: Efficient concurrent execution - **Graceful shutdown**: Context-based cancellation @@ -88,6 +89,17 @@ go build -o signal-loadtest -report-interval 5000 \ -log-level info +# With automatic reconnection +./signal-loadtest \ + -server http://localhost:10000 \ + -pairs-per-sec 10 \ + -total-pairs 50 \ + -exchange-duration 5m \ + -enable-reconnect \ + -initial-retry-delay 100ms \ + -max-reconnect-delay 30s \ + -log-level debug + # Show help ./signal-loadtest -h ``` @@ -111,6 +123,9 @@ The load test supports graceful shutdown via Ctrl+C (SIGINT/SIGTERM): - `-worker-pool-size`: Number of concurrent workers, 0 = auto (pairs-per-sec × 2) (default: 0) - `-channel-buffer-size`: Work queue buffer size, 0 = auto (pairs-per-sec × 4) (default: 0) - `-report-interval`: Report progress every N messages, 0 = no periodic reports (default: 10000) +- `-enable-reconnect`: Enable automatic reconnection on connection loss (default: false) +- `-initial-retry-delay`: Initial delay before first reconnection attempt (default: 100ms) +- `-max-reconnect-delay`: Maximum delay between reconnection attempts (default: 30s) - `-insecure-skip-verify`: Skip TLS certificate verification for self-signed certificates (default: false) - `-log-level`: Log level: trace, debug, info, warn, error (default: info) @@ -206,9 +221,41 @@ func main() { - **WorkerPoolSize**: Number of concurrent worker goroutines (0 = auto: pairs-per-sec × 2) - **ChannelBufferSize**: Work queue buffer size (0 = auto: pairs-per-sec × 4) - **ReportInterval**: Report progress every N messages (0 = no periodic reports, default: 10000) +- **EnableReconnect**: Enable automatic reconnection on connection loss (default: false) +- **InitialRetryDelay**: Initial delay before first reconnection attempt (default: 100ms) +- **MaxReconnectDelay**: Maximum delay between reconnection attempts (default: 30s) - **InsecureSkipVerify**: Skip TLS certificate verification (for self-signed certificates) - **RampUpDuration**: Gradual ramp-up period (not yet implemented) +### Reconnection Handling + +The load test supports automatic reconnection on connection loss: + +- **Disabled by default**: Connections will fail on any network interruption +- **When enabled**: Clients automatically reconnect with exponential backoff +- **Exponential backoff**: Starts at `InitialRetryDelay`, doubles on each failure, caps at `MaxReconnectDelay` +- **Transparent reconnection**: Message exchange continues after successful reconnection +- **Metrics tracking**: Total reconnection count is reported + +**Use cases:** +- Testing resilience to network interruptions +- Validating server restart behavior +- Simulating flaky network conditions +- Long-running stability tests + +**Example with reconnection:** +```go +config := loadtest.LoadTestConfig{ + ServerURL: "http://localhost:10000", + PairsPerSecond: 10, + TotalPairs: 20, + ExchangeDuration: 10 * time.Minute, + EnableReconnect: true, + InitialRetryDelay: 100 * time.Millisecond, + MaxReconnectDelay: 30 * time.Second, +} +``` + ### Performance Tuning When running high-load tests, you may need to adjust the worker pool and buffer sizes: @@ -238,6 +285,7 @@ The load test collects and reports: - **Failed Exchanges**: Failed message exchanges - **Total Messages Exchanged**: Count of successfully exchanged messages - **Total Errors**: Cumulative error count +- **Total Reconnections**: Number of automatic reconnections (if enabled) - **Throughput**: Pairs per second (actual) - **Latency Statistics**: Min, Max, Avg message exchange latency @@ -323,7 +371,8 @@ Latency Statistics: ## Future Enhancements -- [ ] TLS/HTTPS support for production servers +- [x] TLS/HTTPS support for production servers +- [x] Automatic reconnection with exponential backoff - [ ] Ramp-up period implementation - [ ] Percentile latency metrics (p50, p95, p99) - [ ] Connection reuse for multiple messages per pair diff --git a/signal/loadtest/client.go b/signal/loadtest/client.go index 34b82b2bf..858ddb79d 100644 --- a/signal/loadtest/client.go +++ b/signal/loadtest/client.go @@ -5,8 +5,10 @@ import ( "crypto/tls" "fmt" "strings" + "sync" "time" + log "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -18,17 +20,27 @@ import ( // Client represents a signal client for load testing type Client struct { id string + serverURL string + config *ClientConfig conn *grpc.ClientConn client proto.SignalExchangeClient stream proto.SignalExchange_ConnectStreamClient ctx context.Context cancel context.CancelFunc msgChannel chan *proto.EncryptedMessage + + mu sync.RWMutex + reconnectCount int64 + connected bool + receiverStarted bool } // ClientConfig holds optional configuration for the client type ClientConfig struct { InsecureSkipVerify bool + EnableReconnect bool + MaxReconnectDelay time.Duration + InitialRetryDelay time.Duration } // NewClient creates a new signal client for load testing @@ -42,6 +54,16 @@ func NewClientWithConfig(serverURL, peerID string, config *ClientConfig) (*Clien config = &ClientConfig{} } + // Set default reconnect delays if not specified + if config.EnableReconnect { + if config.InitialRetryDelay == 0 { + config.InitialRetryDelay = 100 * time.Millisecond + } + if config.MaxReconnectDelay == 0 { + config.MaxReconnectDelay = 30 * time.Second + } + } + addr, opts, err := parseServerURL(serverURL, config.InsecureSkipVerify) if err != nil { return nil, fmt.Errorf("parse server URL: %w", err) @@ -57,11 +79,14 @@ func NewClientWithConfig(serverURL, peerID string, config *ClientConfig) (*Clien return &Client{ id: peerID, + serverURL: serverURL, + config: config, conn: conn, client: client, ctx: ctx, cancel: cancel, msgChannel: make(chan *proto.EncryptedMessage, 10), + connected: false, }, nil } @@ -79,13 +104,72 @@ func (c *Client) Connect() error { return fmt.Errorf("receive header: %w", err) } + c.mu.Lock() c.stream = stream - - go c.receiveMessages() + c.connected = true + if !c.receiverStarted { + c.receiverStarted = true + c.mu.Unlock() + go c.receiveMessages() + } else { + c.mu.Unlock() + } return nil } +// reconnectStream reconnects the stream without starting a new receiver goroutine +func (c *Client) reconnectStream() error { + if !c.config.EnableReconnect { + return fmt.Errorf("reconnect disabled") + } + + delay := c.config.InitialRetryDelay + attempt := 0 + + for { + select { + case <-c.ctx.Done(): + return c.ctx.Err() + case <-time.After(delay): + attempt++ + log.Debugf("Client %s reconnect attempt %d (delay: %v)", c.id, attempt, delay) + + md := metadata.New(map[string]string{proto.HeaderId: c.id}) + ctx := metadata.NewOutgoingContext(c.ctx, md) + + stream, err := c.client.ConnectStream(ctx) + if err != nil { + log.Debugf("Client %s reconnect attempt %d failed: %v", c.id, attempt, err) + delay *= 2 + if delay > c.config.MaxReconnectDelay { + delay = c.config.MaxReconnectDelay + } + continue + } + + if _, err := stream.Header(); err != nil { + log.Debugf("Client %s reconnect header failed: %v", c.id, err) + delay *= 2 + if delay > c.config.MaxReconnectDelay { + delay = c.config.MaxReconnectDelay + } + continue + } + + c.mu.Lock() + c.stream = stream + c.connected = true + c.reconnectCount++ + c.mu.Unlock() + + log.Debugf("Client %s reconnected successfully (attempt %d, total reconnects: %d)", + c.id, attempt, c.reconnectCount) + return nil + } + } +} + // SendMessage sends an encrypted message to a remote peer using the Send RPC func (c *Client) SendMessage(remotePeerID string, body []byte) error { msg := &proto.EncryptedMessage{ @@ -94,7 +178,7 @@ func (c *Client) SendMessage(remotePeerID string, body []byte) error { Body: body, } - ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second) + ctx, cancel := context.WithTimeout(c.ctx, 10*time.Second) defer cancel() _, err := c.client.Send(ctx, msg) @@ -125,10 +209,41 @@ func (c *Client) Close() error { } func (c *Client) receiveMessages() { - defer close(c.msgChannel) for { - msg, err := c.stream.Recv() + c.mu.RLock() + stream := c.stream + c.mu.RUnlock() + + if stream == nil { + return + } + + msg, err := stream.Recv() if err != nil { + // Check if context is cancelled before attempting reconnection + select { + case <-c.ctx.Done(): + return + default: + } + + c.mu.Lock() + c.connected = false + c.mu.Unlock() + + log.Debugf("Client %s receive error: %v", c.id, err) + + // Attempt reconnection if enabled + if c.config.EnableReconnect { + if reconnectErr := c.reconnectStream(); reconnectErr != nil { + log.Debugf("Client %s reconnection failed: %v", c.id, reconnectErr) + return + } + // Successfully reconnected, continue receiving + continue + } + + // Reconnect disabled, exit return } @@ -140,6 +255,20 @@ func (c *Client) receiveMessages() { } } +// IsConnected returns whether the client is currently connected +func (c *Client) IsConnected() bool { + c.mu.RLock() + defer c.mu.RUnlock() + return c.connected +} + +// GetReconnectCount returns the number of reconnections +func (c *Client) GetReconnectCount() int64 { + c.mu.RLock() + defer c.mu.RUnlock() + return c.reconnectCount +} + func parseServerURL(serverURL string, insecureSkipVerify bool) (string, []grpc.DialOption, error) { serverURL = strings.TrimSpace(serverURL) if serverURL == "" { diff --git a/signal/loadtest/cmd/signal-loadtest/main.go b/signal/loadtest/cmd/signal-loadtest/main.go index efebd06fa..b7abe21c7 100644 --- a/signal/loadtest/cmd/signal-loadtest/main.go +++ b/signal/loadtest/cmd/signal-loadtest/main.go @@ -27,6 +27,9 @@ var ( channelBufferSize int reportInterval int logLevel string + enableReconnect bool + maxReconnectDelay time.Duration + initialRetryDelay time.Duration ) func init() { @@ -42,6 +45,9 @@ func init() { flag.IntVar(&channelBufferSize, "channel-buffer-size", 0, "Channel buffer size (0 = auto: pairs-per-sec * 4)") flag.IntVar(&reportInterval, "report-interval", 10000, "Report progress every N messages (0 = no periodic reports)") flag.StringVar(&logLevel, "log-level", "info", "Log level (trace, debug, info, warn, error)") + flag.BoolVar(&enableReconnect, "enable-reconnect", true, "Enable automatic reconnection on connection loss") + flag.DurationVar(&maxReconnectDelay, "max-reconnect-delay", 30*time.Second, "Maximum delay between reconnection attempts") + flag.DurationVar(&initialRetryDelay, "initial-retry-delay", 100*time.Millisecond, "Initial delay before first reconnection attempt") } func main() { @@ -66,6 +72,9 @@ func main() { WorkerPoolSize: workerPoolSize, ChannelBufferSize: channelBufferSize, ReportInterval: reportInterval, + EnableReconnect: enableReconnect, + MaxReconnectDelay: maxReconnectDelay, + InitialRetryDelay: initialRetryDelay, } if err := validateConfig(config); err != nil { @@ -91,6 +100,11 @@ func main() { } else { log.Infof(" Mode: Single message exchange") } + if config.EnableReconnect { + log.Infof(" Reconnection: ENABLED") + log.Infof(" Initial retry delay: %v", config.InitialRetryDelay) + log.Infof(" Max reconnect delay: %v", config.MaxReconnectDelay) + } fmt.Println() // Set up signal handler for graceful shutdown diff --git a/signal/loadtest/rate_loadtest.go b/signal/loadtest/rate_loadtest.go index 931ca7d60..e0cede1ea 100644 --- a/signal/loadtest/rate_loadtest.go +++ b/signal/loadtest/rate_loadtest.go @@ -25,6 +25,9 @@ type LoadTestConfig struct { WorkerPoolSize int ChannelBufferSize int ReportInterval int // Report progress every N messages (0 = no periodic reports) + EnableReconnect bool + MaxReconnectDelay time.Duration + InitialRetryDelay time.Duration } // LoadTestMetrics metrics collected during the load test @@ -35,6 +38,7 @@ type LoadTestMetrics struct { SuccessfulExchanges atomic.Int64 FailedExchanges atomic.Int64 ActivePairs atomic.Int64 + TotalReconnections atomic.Int64 mu sync.Mutex latencies []time.Duration @@ -182,24 +186,33 @@ func (lt *LoadTest) pairWorker(wg *sync.WaitGroup, pairChan <-chan int) { } func (lt *LoadTest) executePairExchange(pairID int) error { - senderID := fmt.Sprintf("%s-%d", lt.config.IDPrefix, pairID) - receiverID := fmt.Sprintf("%s-%d", lt.config.IDPrefix, pairID) + senderID := fmt.Sprintf("%ssender-%d", lt.config.IDPrefix, pairID) + receiverID := fmt.Sprintf("%sreceiver-%d", lt.config.IDPrefix, pairID) clientConfig := &ClientConfig{ InsecureSkipVerify: lt.config.InsecureSkipVerify, + EnableReconnect: lt.config.EnableReconnect, + MaxReconnectDelay: lt.config.MaxReconnectDelay, + InitialRetryDelay: lt.config.InitialRetryDelay, } sender, err := NewClientWithConfig(lt.config.ServerURL, senderID, clientConfig) if err != nil { return fmt.Errorf("create sender: %w", err) } - defer sender.Close() + defer func() { + sender.Close() + lt.metrics.TotalReconnections.Add(sender.GetReconnectCount()) + }() receiver, err := NewClientWithConfig(lt.config.ServerURL, receiverID, clientConfig) if err != nil { return fmt.Errorf("create receiver: %w", err) } - defer receiver.Close() + defer func() { + receiver.Close() + lt.metrics.TotalReconnections.Add(receiver.GetReconnectCount()) + }() if err := sender.Connect(); err != nil { return fmt.Errorf("sender connect: %w", err) @@ -370,14 +383,15 @@ func (lt *LoadTest) progressReporter(wg *sync.WaitGroup, interval int) { elapsed := time.Since(lt.metrics.startTime) activePairs := lt.metrics.ActivePairs.Load() errors := lt.metrics.TotalErrors.Load() + reconnections := lt.metrics.TotalReconnections.Load() var msgRate float64 if elapsed.Seconds() > 0 { msgRate = float64(currentMessages) / elapsed.Seconds() } - log.Infof("Progress: %d messages exchanged, %d active pairs, %d errors, %.2f msg/sec, elapsed: %v", - currentMessages, activePairs, errors, msgRate, elapsed.Round(time.Second)) + log.Infof("Progress: %d messages exchanged, %d active pairs, %d errors, %d reconnections, %.2f msg/sec, elapsed: %v", + currentMessages, activePairs, errors, reconnections, msgRate, elapsed.Round(time.Second)) lastReported = (currentMessages / int64(interval)) * int64(interval) } @@ -408,6 +422,11 @@ func (m *LoadTestMetrics) PrintReport() { fmt.Printf("Total Messages Exchanged: %d\n", m.TotalMessagesExchanged.Load()) fmt.Printf("Total Errors: %d\n", m.TotalErrors.Load()) + reconnections := m.TotalReconnections.Load() + if reconnections > 0 { + fmt.Printf("Total Reconnections: %d\n", reconnections) + } + if duration.Seconds() > 0 { throughput := float64(m.SuccessfulExchanges.Load()) / duration.Seconds() fmt.Printf("Throughput: %.2f pairs/sec\n", throughput) diff --git a/signal/loadtest/rate_loadtest_test.go b/signal/loadtest/rate_loadtest_test.go index c2e1a13ad..01443e082 100644 --- a/signal/loadtest/rate_loadtest_test.go +++ b/signal/loadtest/rate_loadtest_test.go @@ -197,6 +197,43 @@ func TestLoadTest_ContinuousExchange_ShortBurst(t *testing.T) { require.Equal(t, int64(5), metrics.SuccessfulExchanges.Load(), "All pairs should complete successfully") } +func TestLoadTest_ReconnectionConfig(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx) + defer grpcServer.Stop() + + config := LoadTestConfig{ + ServerURL: serverAddr, + PairsPerSecond: 3, + TotalPairs: 5, + MessageSize: 50, + ExchangeDuration: 2 * time.Second, + MessageInterval: 200 * time.Millisecond, + TestDuration: 5 * time.Second, + EnableReconnect: true, + InitialRetryDelay: 100 * time.Millisecond, + MaxReconnectDelay: 2 * time.Second, + } + + loadTest := NewLoadTest(config) + err := loadTest.Run() + require.NoError(t, err) + + metrics := loadTest.GetMetrics() + metrics.PrintReport() + + // Test should complete successfully with reconnection enabled + require.Equal(t, int64(5), metrics.TotalPairsSent.Load()) + require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(0), "Should have exchanged messages") + require.Equal(t, int64(5), metrics.SuccessfulExchanges.Load(), "All pairs should complete successfully") + + // Reconnections counter should exist (even if zero for this stable test) + reconnections := metrics.TotalReconnections.Load() + require.GreaterOrEqual(t, reconnections, int64(0), "Reconnections metric should be tracked") +} + func BenchmarkLoadTest_Throughput(b *testing.B) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()