mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 00:06:38 +00:00
306 lines
8.4 KiB
Go
306 lines
8.4 KiB
Go
package loadtest
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"go.opentelemetry.io/otel"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/netbirdio/netbird/shared/signal/proto"
|
|
"github.com/netbirdio/netbird/signal/server"
|
|
)
|
|
|
|
func TestLoadTest_10PairsPerSecond(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping load test in short mode")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
|
defer grpcServer.Stop()
|
|
|
|
config := LoadTestConfig{
|
|
ServerURL: serverAddr,
|
|
PairsPerSecond: 10,
|
|
TotalPairs: 50,
|
|
MessageSize: 100,
|
|
TestDuration: 30 * time.Second,
|
|
}
|
|
|
|
loadTest := NewLoadTest(config)
|
|
err := loadTest.Run()
|
|
require.NoError(t, err)
|
|
|
|
metrics := loadTest.GetMetrics()
|
|
metrics.PrintReport()
|
|
|
|
require.Equal(t, int64(50), metrics.TotalPairsSent.Load(), "Should send all 50 pairs")
|
|
require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(0), "Should have successful exchanges")
|
|
require.Equal(t, metrics.TotalMessagesExchanged.Load(), metrics.SuccessfulExchanges.Load(), "Messages exchanged should match successful exchanges")
|
|
}
|
|
|
|
func TestLoadTest_20PairsPerSecond(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping load test in short mode")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
|
defer grpcServer.Stop()
|
|
|
|
config := LoadTestConfig{
|
|
ServerURL: serverAddr,
|
|
PairsPerSecond: 20,
|
|
TotalPairs: 100,
|
|
MessageSize: 500,
|
|
TestDuration: 30 * time.Second,
|
|
}
|
|
|
|
loadTest := NewLoadTest(config)
|
|
err := loadTest.Run()
|
|
require.NoError(t, err)
|
|
|
|
metrics := loadTest.GetMetrics()
|
|
metrics.PrintReport()
|
|
|
|
require.Equal(t, int64(100), metrics.TotalPairsSent.Load(), "Should send all 100 pairs")
|
|
require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(0), "Should have successful exchanges")
|
|
}
|
|
|
|
func TestLoadTest_SmallBurst(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
|
defer grpcServer.Stop()
|
|
|
|
config := LoadTestConfig{
|
|
ServerURL: serverAddr,
|
|
PairsPerSecond: 5,
|
|
TotalPairs: 10,
|
|
MessageSize: 50,
|
|
TestDuration: 10 * time.Second,
|
|
}
|
|
|
|
loadTest := NewLoadTest(config)
|
|
err := loadTest.Run()
|
|
require.NoError(t, err)
|
|
|
|
metrics := loadTest.GetMetrics()
|
|
metrics.PrintReport()
|
|
|
|
require.Equal(t, int64(10), metrics.TotalPairsSent.Load())
|
|
require.Greater(t, metrics.SuccessfulExchanges.Load(), int64(5), "At least 50% success rate")
|
|
require.Less(t, metrics.FailedExchanges.Load(), int64(5), "Less than 50% failure rate")
|
|
}
|
|
|
|
func TestLoadTest_ContinuousExchange_30Seconds(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping continuous exchange test in short mode")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
|
defer grpcServer.Stop()
|
|
|
|
config := LoadTestConfig{
|
|
ServerURL: serverAddr,
|
|
PairsPerSecond: 5,
|
|
TotalPairs: 10,
|
|
MessageSize: 100,
|
|
ExchangeDuration: 30 * time.Second,
|
|
MessageInterval: 100 * time.Millisecond,
|
|
TestDuration: 2 * time.Minute,
|
|
}
|
|
|
|
loadTest := NewLoadTest(config)
|
|
err := loadTest.Run()
|
|
require.NoError(t, err)
|
|
|
|
metrics := loadTest.GetMetrics()
|
|
metrics.PrintReport()
|
|
|
|
require.Equal(t, int64(10), metrics.TotalPairsSent.Load())
|
|
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(2000), "Should exchange many messages over 30 seconds")
|
|
}
|
|
|
|
func TestLoadTest_ContinuousExchange_10Minutes(t *testing.T) {
|
|
if testing.Short() {
|
|
t.Skip("Skipping long continuous exchange test in short mode")
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
|
defer grpcServer.Stop()
|
|
|
|
config := LoadTestConfig{
|
|
ServerURL: serverAddr,
|
|
PairsPerSecond: 10,
|
|
TotalPairs: 20,
|
|
MessageSize: 200,
|
|
ExchangeDuration: 10 * time.Minute,
|
|
MessageInterval: 200 * time.Millisecond,
|
|
TestDuration: 15 * time.Minute,
|
|
}
|
|
|
|
loadTest := NewLoadTest(config)
|
|
err := loadTest.Run()
|
|
require.NoError(t, err)
|
|
|
|
metrics := loadTest.GetMetrics()
|
|
metrics.PrintReport()
|
|
|
|
require.Equal(t, int64(20), metrics.TotalPairsSent.Load())
|
|
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(50000), "Should exchange many messages over 10 minutes")
|
|
}
|
|
|
|
func TestLoadTest_ContinuousExchange_ShortBurst(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
|
defer grpcServer.Stop()
|
|
|
|
config := LoadTestConfig{
|
|
ServerURL: serverAddr,
|
|
PairsPerSecond: 3,
|
|
TotalPairs: 5,
|
|
MessageSize: 50,
|
|
ExchangeDuration: 3 * time.Second,
|
|
MessageInterval: 100 * time.Millisecond,
|
|
TestDuration: 10 * time.Second,
|
|
ReportInterval: 50, // Report every 50 messages for testing
|
|
}
|
|
|
|
loadTest := NewLoadTest(config)
|
|
err := loadTest.Run()
|
|
require.NoError(t, err)
|
|
|
|
metrics := loadTest.GetMetrics()
|
|
metrics.PrintReport()
|
|
|
|
require.Equal(t, int64(5), metrics.TotalPairsSent.Load())
|
|
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(100), "Should exchange multiple messages in 3 seconds")
|
|
require.Equal(t, int64(5), metrics.SuccessfulExchanges.Load(), "All pairs should complete successfully")
|
|
}
|
|
|
|
func TestLoadTest_ReconnectionConfig(t *testing.T) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
grpcServer, serverAddr := startTestSignalServerForLoad(t, ctx)
|
|
defer grpcServer.Stop()
|
|
|
|
config := LoadTestConfig{
|
|
ServerURL: serverAddr,
|
|
PairsPerSecond: 3,
|
|
TotalPairs: 5,
|
|
MessageSize: 50,
|
|
ExchangeDuration: 2 * time.Second,
|
|
MessageInterval: 200 * time.Millisecond,
|
|
TestDuration: 5 * time.Second,
|
|
EnableReconnect: true,
|
|
InitialRetryDelay: 100 * time.Millisecond,
|
|
MaxReconnectDelay: 2 * time.Second,
|
|
}
|
|
|
|
loadTest := NewLoadTest(config)
|
|
err := loadTest.Run()
|
|
require.NoError(t, err)
|
|
|
|
metrics := loadTest.GetMetrics()
|
|
metrics.PrintReport()
|
|
|
|
// Test should complete successfully with reconnection enabled
|
|
require.Equal(t, int64(5), metrics.TotalPairsSent.Load())
|
|
require.Greater(t, metrics.TotalMessagesExchanged.Load(), int64(0), "Should have exchanged messages")
|
|
require.Equal(t, int64(5), metrics.SuccessfulExchanges.Load(), "All pairs should complete successfully")
|
|
|
|
// Reconnections counter should exist (even if zero for this stable test)
|
|
reconnections := metrics.TotalReconnections.Load()
|
|
require.GreaterOrEqual(t, reconnections, int64(0), "Reconnections metric should be tracked")
|
|
}
|
|
|
|
func BenchmarkLoadTest_Throughput(b *testing.B) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
grpcServer, serverAddr := startBenchSignalServer(b, ctx)
|
|
defer grpcServer.Stop()
|
|
|
|
b.Run("5pairs-per-sec", func(b *testing.B) {
|
|
config := LoadTestConfig{
|
|
ServerURL: serverAddr,
|
|
PairsPerSecond: 5,
|
|
TotalPairs: b.N,
|
|
MessageSize: 100,
|
|
}
|
|
|
|
loadTest := NewLoadTest(config)
|
|
b.ResetTimer()
|
|
_ = loadTest.Run()
|
|
b.StopTimer()
|
|
|
|
metrics := loadTest.GetMetrics()
|
|
b.ReportMetric(float64(metrics.SuccessfulExchanges.Load()), "successful")
|
|
b.ReportMetric(float64(metrics.FailedExchanges.Load()), "failed")
|
|
})
|
|
}
|
|
|
|
func startTestSignalServerForLoad(t *testing.T, ctx context.Context) (*grpc.Server, string) {
|
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
|
require.NoError(t, err)
|
|
|
|
grpcServer := grpc.NewServer()
|
|
|
|
signalServer, err := server.NewServer(ctx, otel.Meter("test"))
|
|
require.NoError(t, err)
|
|
|
|
proto.RegisterSignalExchangeServer(grpcServer, signalServer)
|
|
|
|
go func() {
|
|
if err := grpcServer.Serve(listener); err != nil {
|
|
t.Logf("Server stopped: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
return grpcServer, fmt.Sprintf("http://%s", listener.Addr().String())
|
|
}
|
|
|
|
func startBenchSignalServer(b *testing.B, ctx context.Context) (*grpc.Server, string) {
|
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
|
require.NoError(b, err)
|
|
|
|
grpcServer := grpc.NewServer()
|
|
|
|
signalServer, err := server.NewServer(ctx, otel.Meter("bench"))
|
|
require.NoError(b, err)
|
|
|
|
proto.RegisterSignalExchangeServer(grpcServer, signalServer)
|
|
|
|
go func() {
|
|
if err := grpcServer.Serve(listener); err != nil {
|
|
b.Logf("Server stopped: %v", err)
|
|
}
|
|
}()
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
return grpcServer, fmt.Sprintf("http://%s", listener.Addr().String())
|
|
}
|