From ecb6f0831edb6ebcbd330c2732d84befedb92a96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 24 Jul 2024 16:26:26 +0200 Subject: [PATCH] Add metrics --- relay/client/client_test.go | 69 ++++++++++++++++++++++++++---------- relay/client/manager_test.go | 44 +++++++++++++++++------ relay/cmd/main.go | 44 ++++++++++++++++------- relay/metrics/realy.go | 21 +++++++++++ relay/server/relay.go | 16 +++++++-- relay/server/server.go | 15 ++++---- 6 files changed, 156 insertions(+), 53 deletions(-) create mode 100644 relay/metrics/realy.go diff --git a/relay/client/client_test.go b/relay/client/client_test.go index 7b768a4cf..b8685568b 100644 --- a/relay/client/client_test.go +++ b/relay/client/client_test.go @@ -11,6 +11,7 @@ import ( "time" log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel" "github.com/netbirdio/netbird/relay/auth" "github.com/netbirdio/netbird/relay/auth/hmac" @@ -35,7 +36,10 @@ func TestMain(m *testing.M) { func TestClient(t *testing.T) { ctx := context.Background() - srv := server.NewServer(serverURL, false, av) + srv, err := server.NewServer(otel.Meter(""), serverURL, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan := make(chan error, 1) go func() { listenCfg := server.ListenerConfig{Address: serverListenAddr} @@ -58,7 +62,7 @@ func TestClient(t *testing.T) { } t.Log("alice connecting to server") clientAlice := NewClient(ctx, serverURL, hmacTokenStore, "alice") - err := clientAlice.Connect() + err = clientAlice.Connect() if err != nil { t.Fatalf("failed to connect to server: %s", err) } @@ -133,7 +137,10 @@ func transfer(t *testing.T, testData []byte, peerPairs int) { serverAddress := fmt.Sprintf("127.0.0.1:%d", port) serverConnURL := fmt.Sprintf("rel://%s", serverAddress) - srv := server.NewServer(serverConnURL, false, av) + srv, err := server.NewServer(otel.Meter(""), serverConnURL, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan := make(chan error, 1) go func() { listenCfg := server.ListenerConfig{Address: serverAddress} @@ -259,7 +266,10 @@ func transfer(t *testing.T, testData []byte, peerPairs int) { func TestRegistration(t *testing.T) { ctx := context.Background() srvCfg := server.ListenerConfig{Address: serverListenAddr} - srv := server.NewServer(serverURL, false, av) + srv, err := server.NewServer(otel.Meter(""), serverURL, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan := make(chan error, 1) go func() { err := srv.Listen(srvCfg) @@ -274,7 +284,7 @@ func TestRegistration(t *testing.T) { } clientAlice := NewClient(ctx, serverURL, hmacTokenStore, "alice") - err := clientAlice.Connect() + err = clientAlice.Connect() if err != nil { _ = srv.Close() t.Fatalf("failed to connect to server: %s", err) @@ -330,7 +340,10 @@ func TestEcho(t *testing.T) { idAlice := "alice" idBob := "bob" srvCfg := server.ListenerConfig{Address: serverListenAddr} - srv := server.NewServer(serverURL, false, av) + srv, err := server.NewServer(otel.Meter(""), serverURL, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan := make(chan error, 1) go func() { err := srv.Listen(srvCfg) @@ -352,7 +365,7 @@ func TestEcho(t *testing.T) { } clientAlice := NewClient(ctx, serverURL, hmacTokenStore, idAlice) - err := clientAlice.Connect() + err = clientAlice.Connect() if err != nil { t.Fatalf("failed to connect to server: %s", err) } @@ -416,7 +429,10 @@ func TestBindToUnavailabePeer(t *testing.T) { ctx := context.Background() srvCfg := server.ListenerConfig{Address: serverListenAddr} - srv := server.NewServer(serverURL, false, av) + srv, err := server.NewServer(otel.Meter(""), serverURL, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan := make(chan error, 1) go func() { err := srv.Listen(srvCfg) @@ -439,7 +455,7 @@ func TestBindToUnavailabePeer(t *testing.T) { } clientAlice := NewClient(ctx, serverURL, hmacTokenStore, "alice") - err := clientAlice.Connect() + err = clientAlice.Connect() if err != nil { t.Errorf("failed to connect to server: %s", err) } @@ -459,7 +475,10 @@ func TestBindReconnect(t *testing.T) { ctx := context.Background() srvCfg := server.ListenerConfig{Address: serverListenAddr} - srv := server.NewServer(serverURL, false, av) + srv, err := server.NewServer(otel.Meter(""), serverURL, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan := make(chan error, 1) go func() { err := srv.Listen(srvCfg) @@ -482,7 +501,7 @@ func TestBindReconnect(t *testing.T) { } clientAlice := NewClient(ctx, serverURL, hmacTokenStore, "alice") - err := clientAlice.Connect() + err = clientAlice.Connect() if err != nil { t.Errorf("failed to connect to server: %s", err) } @@ -547,7 +566,10 @@ func TestCloseConn(t *testing.T) { ctx := context.Background() srvCfg := server.ListenerConfig{Address: serverListenAddr} - srv := server.NewServer(serverURL, false, av) + srv, err := server.NewServer(otel.Meter(""), serverURL, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan := make(chan error, 1) go func() { err := srv.Listen(srvCfg) @@ -570,7 +592,7 @@ func TestCloseConn(t *testing.T) { } clientAlice := NewClient(ctx, serverURL, hmacTokenStore, "alice") - err := clientAlice.Connect() + err = clientAlice.Connect() if err != nil { t.Errorf("failed to connect to server: %s", err) } @@ -601,7 +623,10 @@ func TestCloseRelayConn(t *testing.T) { ctx := context.Background() srvCfg := server.ListenerConfig{Address: serverListenAddr} - srv := server.NewServer(serverURL, false, av) + srv, err := server.NewServer(otel.Meter(""), serverURL, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan := make(chan error, 1) go func() { err := srv.Listen(srvCfg) @@ -623,7 +648,7 @@ func TestCloseRelayConn(t *testing.T) { } clientAlice := NewClient(ctx, serverURL, hmacTokenStore, "alice") - err := clientAlice.Connect() + err = clientAlice.Connect() if err != nil { t.Fatalf("failed to connect to server: %s", err) } @@ -650,7 +675,10 @@ func TestCloseByServer(t *testing.T) { ctx := context.Background() srvCfg := server.ListenerConfig{Address: serverListenAddr} - srv1 := server.NewServer(serverURL, false, av) + srv1, err := server.NewServer(otel.Meter(""), serverURL, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan := make(chan error, 1) go func() { @@ -668,7 +696,7 @@ func TestCloseByServer(t *testing.T) { idAlice := "alice" log.Debugf("connect by alice") relayClient := NewClient(ctx, serverURL, hmacTokenStore, idAlice) - err := relayClient.Connect() + err = relayClient.Connect() if err != nil { log.Fatalf("failed to connect to server: %s", err) } @@ -700,7 +728,10 @@ func TestCloseByClient(t *testing.T) { ctx := context.Background() srvCfg := server.ListenerConfig{Address: serverListenAddr} - srv := server.NewServer(serverURL, false, av) + srv, err := server.NewServer(otel.Meter(""), serverURL, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan := make(chan error, 1) go func() { err := srv.Listen(srvCfg) @@ -717,7 +748,7 @@ func TestCloseByClient(t *testing.T) { idAlice := "alice" log.Debugf("connect by alice") relayClient := NewClient(ctx, serverURL, hmacTokenStore, idAlice) - err := relayClient.Connect() + err = relayClient.Connect() if err != nil { log.Fatalf("failed to connect to server: %s", err) } diff --git a/relay/client/manager_test.go b/relay/client/manager_test.go index 8da1ef20f..928171175 100644 --- a/relay/client/manager_test.go +++ b/relay/client/manager_test.go @@ -6,6 +6,7 @@ import ( "time" log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel" "github.com/netbirdio/netbird/relay/server" ) @@ -16,7 +17,10 @@ func TestForeignConn(t *testing.T) { srvCfg1 := server.ListenerConfig{ Address: "localhost:1234", } - srv1 := server.NewServer(srvCfg1.Address, false, av) + srv1, err := server.NewServer(otel.Meter(""), srvCfg1.Address, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan := make(chan error, 1) go func() { err := srv1.Listen(srvCfg1) @@ -39,7 +43,10 @@ func TestForeignConn(t *testing.T) { srvCfg2 := server.ListenerConfig{ Address: "localhost:2234", } - srv2 := server.NewServer(srvCfg2.Address, false, av) + srv2, err := server.NewServer(otel.Meter(""), srvCfg2.Address, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan2 := make(chan error, 1) go func() { err := srv2.Listen(srvCfg2) @@ -64,7 +71,7 @@ func TestForeignConn(t *testing.T) { mCtx, cancel := context.WithCancel(ctx) defer cancel() clientAlice := NewManager(mCtx, toURL(srvCfg1), idAlice) - err := clientAlice.Serve() + err = clientAlice.Serve() if err != nil { t.Fatalf("failed to serve manager: %s", err) } @@ -122,7 +129,10 @@ func TestForeginConnClose(t *testing.T) { srvCfg1 := server.ListenerConfig{ Address: "localhost:1234", } - srv1 := server.NewServer(srvCfg1.Address, false, av) + srv1, err := server.NewServer(otel.Meter(""), srvCfg1.Address, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan := make(chan error, 1) go func() { err := srv1.Listen(srvCfg1) @@ -145,7 +155,10 @@ func TestForeginConnClose(t *testing.T) { srvCfg2 := server.ListenerConfig{ Address: "localhost:2234", } - srv2 := server.NewServer(srvCfg2.Address, false, av) + srv2, err := server.NewServer(otel.Meter(""), srvCfg2.Address, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan2 := make(chan error, 1) go func() { err := srv2.Listen(srvCfg2) @@ -170,7 +183,7 @@ func TestForeginConnClose(t *testing.T) { mCtx, cancel := context.WithCancel(ctx) defer cancel() mgr := NewManager(mCtx, toURL(srvCfg1), idAlice) - err := mgr.Serve() + err = mgr.Serve() if err != nil { t.Fatalf("failed to serve manager: %s", err) } @@ -191,7 +204,10 @@ func TestForeginAutoClose(t *testing.T) { srvCfg1 := server.ListenerConfig{ Address: "localhost:1234", } - srv1 := server.NewServer(srvCfg1.Address, false, av) + srv1, err := server.NewServer(otel.Meter(""), srvCfg1.Address, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan := make(chan error, 1) go func() { t.Log("binding server 1.") @@ -217,7 +233,10 @@ func TestForeginAutoClose(t *testing.T) { srvCfg2 := server.ListenerConfig{ Address: "localhost:2234", } - srv2 := server.NewServer(srvCfg2.Address, false, av) + srv2, err := server.NewServer(otel.Meter(""), srvCfg2.Address, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan2 := make(chan error, 1) go func() { t.Log("binding server 2.") @@ -244,7 +263,7 @@ func TestForeginAutoClose(t *testing.T) { mCtx, cancel := context.WithCancel(ctx) defer cancel() mgr := NewManager(mCtx, toURL(srvCfg1), idAlice) - err := mgr.Serve() + err = mgr.Serve() if err != nil { t.Fatalf("failed to serve manager: %s", err) } @@ -277,7 +296,10 @@ func TestAutoReconnect(t *testing.T) { srvCfg := server.ListenerConfig{ Address: "localhost:1234", } - srv := server.NewServer(srvCfg.Address, false, av) + srv, err := server.NewServer(otel.Meter(""), srvCfg.Address, false, av) + if err != nil { + t.Fatalf("failed to create server: %s", err) + } errChan := make(chan error, 1) go func() { err := srv.Listen(srvCfg) @@ -300,7 +322,7 @@ func TestAutoReconnect(t *testing.T) { mCtx, cancel := context.WithCancel(ctx) defer cancel() clientAlice := NewManager(mCtx, toURL(srvCfg), "alice") - err := clientAlice.Serve() + err = clientAlice.Serve() if err != nil { t.Fatalf("failed to serve manager: %s", err) } diff --git a/relay/cmd/main.go b/relay/cmd/main.go index b96482ef8..22dc025a8 100644 --- a/relay/cmd/main.go +++ b/relay/cmd/main.go @@ -2,7 +2,9 @@ package main import ( "crypto/tls" + "errors" "fmt" + "net/http" "os" "os/signal" "syscall" @@ -14,9 +16,14 @@ import ( "github.com/netbirdio/netbird/encryption" auth "github.com/netbirdio/netbird/relay/auth/hmac" "github.com/netbirdio/netbird/relay/server" + "github.com/netbirdio/netbird/signal/metrics" "github.com/netbirdio/netbird/util" ) +const ( + metricsPort = 9090 +) + type Config struct { ListenAddress string // in HA every peer connect to a common domain, the instance domain has been distributed during the p2p connection @@ -54,7 +61,7 @@ var ( Use: "relay", Short: "Relay service", Long: "Relay service for Netbird agents", - Run: execute, + RunE: execute, } ) @@ -110,11 +117,10 @@ func loadConfig(configFile string) (*Config, error) { return loadedConfig, err } -func execute(cmd *cobra.Command, args []string) { +func execute(cmd *cobra.Command, args []string) error { cfg, err := loadConfig(cfgFile) if err != nil { - log.Errorf("failed to load config: %s", err) - os.Exit(1) + return fmt.Errorf("failed to load config: %s", err) } err = cfg.Validate() @@ -123,21 +129,31 @@ func execute(cmd *cobra.Command, args []string) { os.Exit(1) } + metricsServer, err := metrics.NewServer(metricsPort, "") + if err != nil { + return fmt.Errorf("setup metrics: %v", err) + } + + go func() { + log.Infof("running metrics server: %s%s", metricsServer.Addr, metricsServer.Endpoint) + if err := metricsServer.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("Failed to start metrics server: %v", err) + } + }() + srvListenerCfg := server.ListenerConfig{ Address: cfg.ListenAddress, } if cfg.HasLetsEncrypt() { tlsCfg, err := setupTLSCertManager(cfg.LetsencryptDataDir, cfg.LetsencryptDomains...) if err != nil { - log.Errorf("%s", err) - os.Exit(1) + return fmt.Errorf("%s", err) } srvListenerCfg.TLSConfig = tlsCfg } else if cfg.HasCertConfig() { tlsCfg, err := encryption.LoadTLSConfig(cfg.TlsCertFile, cfg.TlsKeyFile) if err != nil { - log.Errorf("%s", err) - os.Exit(1) + return fmt.Errorf("%s", err) } srvListenerCfg.TLSConfig = tlsCfg } @@ -145,21 +161,23 @@ func execute(cmd *cobra.Command, args []string) { tlsSupport := srvListenerCfg.TLSConfig != nil authenticator := auth.NewTimedHMACValidator(cfg.AuthSecret, 24*time.Hour) - srv := server.NewServer(cfg.ExposedAddress, tlsSupport, authenticator) + srv, err := server.NewServer(metricsServer.Meter, cfg.ExposedAddress, tlsSupport, authenticator) + if err != nil { + return fmt.Errorf("failed to create relay server: %v", err) + } log.Infof("server will be available on: %s", srv.InstanceURL()) err = srv.Listen(srvListenerCfg) if err != nil { - log.Errorf("failed to bind server: %s", err) - os.Exit(1) + return fmt.Errorf("failed to bind server: %s", err) } waitForExitSignal() err = srv.Close() if err != nil { - log.Errorf("failed to close server: %s", err) - os.Exit(1) + return fmt.Errorf("failed to close server: %s", err) } + return nil } func setupTLSCertManager(letsencryptDataDir string, letsencryptDomains ...string) (*tls.Config, error) { diff --git a/relay/metrics/realy.go b/relay/metrics/realy.go new file mode 100644 index 000000000..29dc6557b --- /dev/null +++ b/relay/metrics/realy.go @@ -0,0 +1,21 @@ +package metrics + +import "go.opentelemetry.io/otel/metric" + +type Metrics struct { + metric.Meter + + Peers metric.Int64UpDownCounter +} + +func NewMetrics(meter metric.Meter) (*Metrics, error) { + peers, err := meter.Int64UpDownCounter("peers") + if err != nil { + return nil, err + } + + return &Metrics{ + Meter: meter, + Peers: peers, + }, nil +} diff --git a/relay/server/relay.go b/relay/server/relay.go index cb35c6e17..6dca3e62c 100644 --- a/relay/server/relay.go +++ b/relay/server/relay.go @@ -7,12 +7,15 @@ import ( "sync" log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/metric" "github.com/netbirdio/netbird/relay/auth" "github.com/netbirdio/netbird/relay/messages" + "github.com/netbirdio/netbird/relay/metrics" ) type Relay struct { + metrics *metrics.Metrics validator auth.Validator store *Store @@ -22,8 +25,14 @@ type Relay struct { closeMu sync.RWMutex } -func NewRelay(exposedAddress string, tlsSupport bool, validator auth.Validator) *Relay { +func NewRelay(meter metric.Meter, exposedAddress string, tlsSupport bool, validator auth.Validator) (*Relay, error) { + m, err := metrics.NewMetrics(meter) + if err != nil { + return nil, fmt.Errorf("creating app metrics: %v", err) + } + r := &Relay{ + metrics: m, validator: validator, store: NewStore(), } @@ -34,7 +43,7 @@ func NewRelay(exposedAddress string, tlsSupport bool, validator auth.Validator) r.instaceURL = fmt.Sprintf("rel://%s", exposedAddress) } - return r + return r, nil } func (r *Relay) Accept(conn net.Conn) { @@ -57,11 +66,12 @@ func (r *Relay) Accept(conn net.Conn) { peer := NewPeer(peerID, conn, r.store) peer.log.Infof("peer connected from: %s", conn.RemoteAddr()) r.store.AddPeer(peer) - + r.metrics.Peers.Add(context.Background(), 1) go func() { peer.Work() r.store.DeletePeer(peer) peer.log.Debugf("relay connection closed") + r.metrics.Peers.Add(context.Background(), -1) }() } diff --git a/relay/server/server.go b/relay/server/server.go index cac884372..0eff59eb2 100644 --- a/relay/server/server.go +++ b/relay/server/server.go @@ -8,6 +8,7 @@ import ( "time" log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/metric" "github.com/netbirdio/netbird/relay/auth" "github.com/netbirdio/netbird/relay/server/listener" @@ -26,14 +27,14 @@ type Server struct { wSListener listener.Listener } -func NewServer(exposedAddress string, tlsSupport bool, authValidator auth.Validator) *Server { - return &Server{ - relay: NewRelay( - exposedAddress, - tlsSupport, - authValidator, - ), +func NewServer(meter metric.Meter, exposedAddress string, tlsSupport bool, authValidator auth.Validator) (*Server, error) { + relay, err := NewRelay(meter, exposedAddress, tlsSupport, authValidator) + if err != nil { + return nil, err } + return &Server{ + relay: relay, + }, nil } func (r *Server) Listen(cfg ListenerConfig) error {