From 351db3dd498d6f191b6d92b7771d7824a462d93f Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Wed, 7 Aug 2024 17:35:58 +0200 Subject: [PATCH] Feature/relay integration metrics (#2376) Extend metrics - TransferBytesSent - Active/idle peers - Connection times --- relay/metrics/realy.go | 131 ++++++++++++++++++++++++++++++++++++++--- relay/server/peer.go | 33 +++++++---- relay/server/relay.go | 23 +++++--- 3 files changed, 157 insertions(+), 30 deletions(-) diff --git a/relay/metrics/realy.go b/relay/metrics/realy.go index 29dc6557b..80e12ee6b 100644 --- a/relay/metrics/realy.go +++ b/relay/metrics/realy.go @@ -1,21 +1,136 @@ package metrics -import "go.opentelemetry.io/otel/metric" +import ( + "context" + "sync" + "time" + + log "github.com/sirupsen/logrus" + "go.opentelemetry.io/otel/metric" +) + +const ( + idleTimeout = 30 * time.Second +) type Metrics struct { metric.Meter - Peers metric.Int64UpDownCounter + TransferBytesSent metric.Int64Counter + TransferBytesRecv metric.Int64Counter + + peers metric.Int64UpDownCounter + peerActivityChan chan string + peerLastActive map[string]time.Time + mutexActivity sync.Mutex + ctx context.Context } -func NewMetrics(meter metric.Meter) (*Metrics, error) { - peers, err := meter.Int64UpDownCounter("peers") +func NewMetrics(ctx context.Context, meter metric.Meter) (*Metrics, error) { + bytesSent, err := meter.Int64Counter("relay_transfer_sent_bytes_total") if err != nil { return nil, err } - return &Metrics{ - Meter: meter, - Peers: peers, - }, nil + bytesRecv, err := meter.Int64Counter("relay_transfer_received_bytes_total") + if err != nil { + return nil, err + } + + peers, err := meter.Int64UpDownCounter("relay_peers") + if err != nil { + return nil, err + } + + peersActive, err := meter.Int64ObservableGauge("relay_peers_active") + if err != nil { + return nil, err + } + + peersIdle, err := meter.Int64ObservableGauge("relay_peers_idle") + if err != nil { + return nil, err + } + + m := &Metrics{ + Meter: meter, + TransferBytesSent: bytesSent, + TransferBytesRecv: bytesRecv, + peers: peers, + + ctx: ctx, + peerActivityChan: make(chan string, 10), + peerLastActive: make(map[string]time.Time), + } + + _, err = meter.RegisterCallback( + func(ctx context.Context, o metric.Observer) error { + active, idle := m.calculateActiveIdleConnections() + o.ObserveInt64(peersActive, active) + o.ObserveInt64(peersIdle, idle) + return nil + }, + peersActive, peersIdle, + ) + if err != nil { + return nil, err + } + + go m.readPeerActivity() + return m, nil +} + +// PeerConnected increments the number of connected peers and increments number of idle connections +func (m *Metrics) PeerConnected(id string) { + m.peers.Add(m.ctx, 1) + m.mutexActivity.Lock() + defer m.mutexActivity.Unlock() + + m.peerLastActive[id] = time.Time{} +} + +// PeerDisconnected decrements the number of connected peers and decrements number of idle or active connections +func (m *Metrics) PeerDisconnected(id string) { + m.peers.Add(m.ctx, -1) + m.mutexActivity.Lock() + defer m.mutexActivity.Unlock() + + delete(m.peerLastActive, id) +} + +// PeerActivity increases the active connections +func (m *Metrics) PeerActivity(peerID string) { + select { + case m.peerActivityChan <- peerID: + default: + log.Errorf("peer activity channel is full, dropping activity metrics for peer %s", peerID) + } +} + +func (m *Metrics) calculateActiveIdleConnections() (int64, int64) { + active, idle := int64(0), int64(0) + m.mutexActivity.Lock() + defer m.mutexActivity.Unlock() + + for _, lastActive := range m.peerLastActive { + if time.Since(lastActive) > idleTimeout { + idle++ + } else { + active++ + } + } + return active, idle +} + +func (m *Metrics) readPeerActivity() { + for { + select { + case peerID := <-m.peerActivityChan: + m.mutexActivity.Lock() + m.peerLastActive[peerID] = time.Now() + m.mutexActivity.Unlock() + case <-m.ctx.Done(): + return + } + } } diff --git a/relay/server/peer.go b/relay/server/peer.go index 52f3b185b..e3595b2e7 100644 --- a/relay/server/peer.go +++ b/relay/server/peer.go @@ -12,6 +12,7 @@ import ( "github.com/netbirdio/netbird/relay/healthcheck" "github.com/netbirdio/netbird/relay/messages" + "github.com/netbirdio/netbird/relay/metrics" ) const ( @@ -20,23 +21,25 @@ const ( // Peer represents a peer connection type Peer struct { - log *log.Entry - idS string - idB []byte - conn net.Conn - connMu sync.RWMutex - store *Store + metrics *metrics.Metrics + log *log.Entry + idS string + idB []byte + conn net.Conn + connMu sync.RWMutex + store *Store } // NewPeer creates a new Peer instance and prepare custom logging -func NewPeer(id []byte, conn net.Conn, store *Store) *Peer { +func NewPeer(metrics *metrics.Metrics, id []byte, conn net.Conn, store *Store) *Peer { stringID := messages.HashIDToString(id) return &Peer{ - log: log.WithField("peer_id", stringID), - idS: stringID, - idB: id, - conn: conn, - store: store, + metrics: metrics, + log: log.WithField("peer_id", stringID), + idS: stringID, + idB: id, + conn: conn, + store: store, } } @@ -70,6 +73,8 @@ func (p *Peer) Work() { case messages.MsgTypeHealthCheck: hc.OnHCResponse() case messages.MsgTypeTransport: + p.metrics.TransferBytesRecv.Add(ctx, int64(n)) + p.metrics.PeerActivity(p.String()) p.handleTransportMsg(msg) case messages.MsgTypeClose: p.log.Infof("peer exited gracefully") @@ -167,8 +172,10 @@ func (p *Peer) handleTransportMsg(msg []byte) { p.log.Errorf("failed to update transport message: %s", err) return } - _, err = dp.Write(msg) + n, err := dp.Write(msg) if err != nil { p.log.Errorf("failed to write transport message to: %s", dp.String()) + return } + p.metrics.TransferBytesSent.Add(context.Background(), int64(n)) } diff --git a/relay/server/relay.go b/relay/server/relay.go index 4621e96b6..45f52824a 100644 --- a/relay/server/relay.go +++ b/relay/server/relay.go @@ -17,8 +17,9 @@ import ( // Relay represents the relay server type Relay struct { - metrics *metrics.Metrics - validator auth.Validator + metrics *metrics.Metrics + metricsCancel context.CancelFunc + validator auth.Validator store *Store instanceURL string @@ -43,15 +44,18 @@ type Relay struct { // A pointer to a Relay instance and an error. If the Relay instance is successfully created, the error is nil. // Otherwise, the error contains the details of what went wrong. func NewRelay(meter metric.Meter, exposedAddress string, tlsSupport bool, validator auth.Validator) (*Relay, error) { - m, err := metrics.NewMetrics(meter) + ctx, metricsCancel := context.WithCancel(context.Background()) + m, err := metrics.NewMetrics(ctx, meter) if err != nil { + metricsCancel() return nil, fmt.Errorf("creating app metrics: %v", err) } r := &Relay{ - metrics: m, - validator: validator, - store: NewStore(), + metrics: m, + metricsCancel: metricsCancel, + validator: validator, + store: NewStore(), } if tlsSupport { @@ -85,15 +89,15 @@ func (r *Relay) Accept(conn net.Conn) { return } - peer := NewPeer(peerID, conn, r.store) + peer := NewPeer(r.metrics, peerID, conn, r.store) peer.log.Infof("peer connected from: %s", conn.RemoteAddr()) r.store.AddPeer(peer) - r.metrics.Peers.Add(context.Background(), 1) + r.metrics.PeerConnected(peer.String()) go func() { peer.Work() r.store.DeletePeer(peer) peer.log.Debugf("relay connection closed") - r.metrics.Peers.Add(context.Background(), -1) + r.metrics.PeerDisconnected(peer.String()) }() } @@ -112,6 +116,7 @@ func (r *Relay) Close(ctx context.Context) { }(peer) } wg.Wait() + r.metricsCancel() r.closeMu.Unlock() }