Files
netbird/signal/loadtest/rate_loadtest_test.go
Maycon Santos 4787e28ae3 with retry
2025-10-12 15:54:39 +02:00

306 lines
8.4 KiB
Go

package loadtest
import (
"context"
"fmt"
"net"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"google.golang.org/grpc"
"github.com/netbirdio/netbird/shared/signal/proto"
"github.com/netbirdio/netbird/signal/server"
)
func TestLoadTest_10PairsPerSecond(t *testing.T) {
if testing.Short() {
t.Skip("Skipping load test in short mode")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
defer grpcServer.Stop()
config := LoadTestConfig{
ServerURL: serverAddr,
PairsPerSecond: 10,
TotalPairs: 50,
MessageSize: 100,
TestDuration: 30 * time.Second,
}
loadTest := NewLoadTest(config)
err := loadTest.Run()
require.NoError(t, err)
metrics := loadTest.GetMetrics()
metrics.PrintReport()
require.Equal(t, int64(50), metrics.TotalPairsSent.Load(), "Should send all 50 pairs")
require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(0), "Should have successful exchanges")
require.Equal(t, metrics.TotalMessagesExchanged.Load(), metrics.SuccessfulExchanges.Load(), "Messages exchanged should match successful exchanges")
}
func TestLoadTest_20PairsPerSecond(t *testing.T) {
if testing.Short() {
t.Skip("Skipping load test in short mode")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
defer grpcServer.Stop()
config := LoadTestConfig{
ServerURL: serverAddr,
PairsPerSecond: 20,
TotalPairs: 100,
MessageSize: 500,
TestDuration: 30 * time.Second,
}
loadTest := NewLoadTest(config)
err := loadTest.Run()
require.NoError(t, err)
metrics := loadTest.GetMetrics()
metrics.PrintReport()
require.Equal(t, int64(100), metrics.TotalPairsSent.Load(), "Should send all 100 pairs")
require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(0), "Should have successful exchanges")
}
func TestLoadTest_SmallBurst(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
defer grpcServer.Stop()
config := LoadTestConfig{
ServerURL: serverAddr,
PairsPerSecond: 5,
TotalPairs: 10,
MessageSize: 50,
TestDuration: 10 * time.Second,
}
loadTest := NewLoadTest(config)
err := loadTest.Run()
require.NoError(t, err)
metrics := loadTest.GetMetrics()
metrics.PrintReport()
require.Equal(t, int64(10), metrics.TotalPairsSent.Load())
require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(5), "At least 50% success rate")
require.Less(t, metrics.FailedExchanges.Load(), int64(5), "Less than 50% failure rate")
}
func TestLoadTest_ContinuousExchange_30Seconds(t *testing.T) {
if testing.Short() {
t.Skip("Skipping continuous exchange test in short mode")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
defer grpcServer.Stop()
config := LoadTestConfig{
ServerURL: serverAddr,
PairsPerSecond: 5,
TotalPairs: 10,
MessageSize: 100,
ExchangeDuration: 30 * time.Second,
MessageInterval: 100 * time.Millisecond,
TestDuration: 2 * time.Minute,
}
loadTest := NewLoadTest(config)
err := loadTest.Run()
require.NoError(t, err)
metrics := loadTest.GetMetrics()
metrics.PrintReport()
require.Equal(t, int64(10), metrics.TotalPairsSent.Load())
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(2000), "Should exchange many messages over 30 seconds")
}
func TestLoadTest_ContinuousExchange_10Minutes(t *testing.T) {
if testing.Short() {
t.Skip("Skipping long continuous exchange test in short mode")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
defer grpcServer.Stop()
config := LoadTestConfig{
ServerURL: serverAddr,
PairsPerSecond: 10,
TotalPairs: 20,
MessageSize: 200,
ExchangeDuration: 10 * time.Minute,
MessageInterval: 200 * time.Millisecond,
TestDuration: 15 * time.Minute,
}
loadTest := NewLoadTest(config)
err := loadTest.Run()
require.NoError(t, err)
metrics := loadTest.GetMetrics()
metrics.PrintReport()
require.Equal(t, int64(20), metrics.TotalPairsSent.Load())
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(50000), "Should exchange many messages over 10 minutes")
}
func TestLoadTest_ContinuousExchange_ShortBurst(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
defer grpcServer.Stop()
config := LoadTestConfig{
ServerURL: serverAddr,
PairsPerSecond: 3,
TotalPairs: 5,
MessageSize: 50,
ExchangeDuration: 3 * time.Second,
MessageInterval: 100 * time.Millisecond,
TestDuration: 10 * time.Second,
ReportInterval: 50, // Report every 50 messages for testing
}
loadTest := NewLoadTest(config)
err := loadTest.Run()
require.NoError(t, err)
metrics := loadTest.GetMetrics()
metrics.PrintReport()
require.Equal(t, int64(5), metrics.TotalPairsSent.Load())
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(100), "Should exchange multiple messages in 3 seconds")
require.Equal(t, int64(5), metrics.SuccessfulExchanges.Load(), "All pairs should complete successfully")
}
func TestLoadTest_ReconnectionConfig(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
defer grpcServer.Stop()
config := LoadTestConfig{
ServerURL: serverAddr,
PairsPerSecond: 3,
TotalPairs: 5,
MessageSize: 50,
ExchangeDuration: 2 * time.Second,
MessageInterval: 200 * time.Millisecond,
TestDuration: 5 * time.Second,
EnableReconnect: true,
InitialRetryDelay: 100 * time.Millisecond,
MaxReconnectDelay: 2 * time.Second,
}
loadTest := NewLoadTest(config)
err := loadTest.Run()
require.NoError(t, err)
metrics := loadTest.GetMetrics()
metrics.PrintReport()
// Test should complete successfully with reconnection enabled
require.Equal(t, int64(5), metrics.TotalPairsSent.Load())
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(0), "Should have exchanged messages")
require.Equal(t, int64(5), metrics.SuccessfulExchanges.Load(), "All pairs should complete successfully")
// Reconnections counter should exist (even if zero for this stable test)
reconnections := metrics.TotalReconnections.Load()
require.GreaterOrEqual(t, reconnections, int64(0), "Reconnections metric should be tracked")
}
func BenchmarkLoadTest_Throughput(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
grpcServer, serverAddr := startBenchSignalServer(b, ctx)
defer grpcServer.Stop()
b.Run("5pairs-per-sec", func(b *testing.B) {
config := LoadTestConfig{
ServerURL: serverAddr,
PairsPerSecond: 5,
TotalPairs: b.N,
MessageSize: 100,
}
loadTest := NewLoadTest(config)
b.ResetTimer()
_ = loadTest.Run()
b.StopTimer()
metrics := loadTest.GetMetrics()
b.ReportMetric(float64(metrics.SuccessfulExchanges.Load()), "successful")
b.ReportMetric(float64(metrics.FailedExchanges.Load()), "failed")
})
}
func startTestSignalServerForLoad(t *testing.T, ctx context.Context) (*grpc.Server, string) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
grpcServer := grpc.NewServer()
signalServer, err := server.NewServer(ctx, otel.Meter("test"))
require.NoError(t, err)
proto.RegisterSignalExchangeServer(grpcServer, signalServer)
go func() {
if err := grpcServer.Serve(listener); err != nil {
t.Logf("Server stopped: %v", err)
}
}()
time.Sleep(100 * time.Millisecond)
return grpcServer, fmt.Sprintf("http://%s", listener.Addr().String())
}
func startBenchSignalServer(b *testing.B, ctx context.Context) (*grpc.Server, string) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(b, err)
grpcServer := grpc.NewServer()
signalServer, err := server.NewServer(ctx, otel.Meter("bench"))
require.NoError(b, err)
proto.RegisterSignalExchangeServer(grpcServer, signalServer)
go func() {
if err := grpcServer.Serve(listener); err != nil {
b.Logf("Server stopped: %v", err)
}
}()
time.Sleep(100 * time.Millisecond)
return grpcServer, fmt.Sprintf("http://%s", listener.Addr().String())
}