Files
netbird/proxy/internal/metrics/metrics.go

390 lines
11 KiB
Go

package metrics
import (
"context"
"net/http"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"github.com/netbirdio/netbird/proxy/internal/proxy"
"github.com/netbirdio/netbird/proxy/internal/responsewriter"
"github.com/netbirdio/netbird/proxy/internal/types"
)
// Metrics collects OpenTelemetry metrics for the proxy.
type Metrics struct {
ctx context.Context
requestsTotal metric.Int64Counter
activeRequests metric.Int64UpDownCounter
configuredDomains metric.Int64UpDownCounter
totalPaths metric.Int64UpDownCounter
requestDuration metric.Int64Histogram
backendDuration metric.Int64Histogram
certificateIssueDuration metric.Int64Histogram
// L4 service-level metrics.
l4Services metric.Int64UpDownCounter
// L4 TCP connection-level metrics.
tcpActiveConns metric.Int64UpDownCounter
tcpConnsTotal metric.Int64Counter
tcpConnDuration metric.Int64Histogram
tcpBytesTotal metric.Int64Counter
// L4 UDP session-level metrics.
udpActiveSess metric.Int64UpDownCounter
udpSessionsTotal metric.Int64Counter
udpPacketsTotal metric.Int64Counter
udpBytesTotal metric.Int64Counter
mappingsMux sync.Mutex
mappingPaths map[string]int
}
// New creates a Metrics instance using the given OpenTelemetry meter.
func New(ctx context.Context, meter metric.Meter) (*Metrics, error) {
m := &Metrics{
ctx: ctx,
mappingPaths: make(map[string]int),
}
if err := m.initHTTPMetrics(meter); err != nil {
return nil, err
}
if err := m.initL4Metrics(meter); err != nil {
return nil, err
}
return m, nil
}
func (m *Metrics) initHTTPMetrics(meter metric.Meter) error {
var err error
m.requestsTotal, err = meter.Int64Counter(
"proxy.http.request.counter",
metric.WithUnit("1"),
metric.WithDescription("Total number of requests made to the netbird proxy"),
)
if err != nil {
return err
}
m.activeRequests, err = meter.Int64UpDownCounter(
"proxy.http.active_requests",
metric.WithUnit("1"),
metric.WithDescription("Current in-flight requests handled by the netbird proxy"),
)
if err != nil {
return err
}
m.configuredDomains, err = meter.Int64UpDownCounter(
"proxy.domains.count",
metric.WithUnit("1"),
metric.WithDescription("Current number of domains configured on the netbird proxy"),
)
if err != nil {
return err
}
m.totalPaths, err = meter.Int64UpDownCounter(
"proxy.paths.count",
metric.WithUnit("1"),
metric.WithDescription("Total number of paths configured on the netbird proxy"),
)
if err != nil {
return err
}
m.requestDuration, err = meter.Int64Histogram(
"proxy.http.request.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of requests made to the netbird proxy"),
)
if err != nil {
return err
}
m.backendDuration, err = meter.Int64Histogram(
"proxy.backend.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of peer round trip time from the netbird proxy"),
)
if err != nil {
return err
}
m.certificateIssueDuration, err = meter.Int64Histogram(
"proxy.certificate.issue.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of ACME certificate issuance"),
)
return err
}
func (m *Metrics) initL4Metrics(meter metric.Meter) error {
var err error
m.l4Services, err = meter.Int64UpDownCounter(
"proxy.l4.services.count",
metric.WithUnit("1"),
metric.WithDescription("Current number of configured L4 services (TCP/TLS/UDP) by mode"),
)
if err != nil {
return err
}
m.tcpActiveConns, err = meter.Int64UpDownCounter(
"proxy.tcp.active_connections",
metric.WithUnit("1"),
metric.WithDescription("Current number of active TCP/TLS relay connections"),
)
if err != nil {
return err
}
m.tcpConnsTotal, err = meter.Int64Counter(
"proxy.tcp.connections.total",
metric.WithUnit("1"),
metric.WithDescription("Total TCP/TLS relay connections by result and account"),
)
if err != nil {
return err
}
m.tcpConnDuration, err = meter.Int64Histogram(
"proxy.tcp.connection.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of TCP/TLS relay connections"),
)
if err != nil {
return err
}
m.tcpBytesTotal, err = meter.Int64Counter(
"proxy.tcp.bytes.total",
metric.WithUnit("bytes"),
metric.WithDescription("Total bytes transferred through TCP/TLS relay by direction"),
)
if err != nil {
return err
}
m.udpActiveSess, err = meter.Int64UpDownCounter(
"proxy.udp.active_sessions",
metric.WithUnit("1"),
metric.WithDescription("Current number of active UDP relay sessions"),
)
if err != nil {
return err
}
m.udpSessionsTotal, err = meter.Int64Counter(
"proxy.udp.sessions.total",
metric.WithUnit("1"),
metric.WithDescription("Total UDP relay sessions by result and account"),
)
if err != nil {
return err
}
m.udpPacketsTotal, err = meter.Int64Counter(
"proxy.udp.packets.total",
metric.WithUnit("1"),
metric.WithDescription("Total UDP packets relayed by direction"),
)
if err != nil {
return err
}
m.udpBytesTotal, err = meter.Int64Counter(
"proxy.udp.bytes.total",
metric.WithUnit("bytes"),
metric.WithDescription("Total bytes transferred through UDP relay by direction"),
)
return err
}
type responseInterceptor struct {
*responsewriter.PassthroughWriter
status int
size int
}
func (w *responseInterceptor) WriteHeader(status int) {
w.status = status
w.PassthroughWriter.WriteHeader(status)
}
func (w *responseInterceptor) Write(b []byte) (int, error) {
size, err := w.PassthroughWriter.Write(b)
w.size += size
return size, err
}
// Unwrap returns the underlying ResponseWriter so http.ResponseController
// can reach through to the original writer for Hijack/Flush operations.
func (w *responseInterceptor) Unwrap() http.ResponseWriter {
return w.PassthroughWriter
}
// Middleware wraps an HTTP handler with request metrics.
func (m *Metrics) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
m.requestsTotal.Add(m.ctx, 1)
m.activeRequests.Add(m.ctx, 1)
interceptor := &responseInterceptor{PassthroughWriter: responsewriter.New(w)}
start := time.Now()
defer func() {
duration := time.Since(start)
m.activeRequests.Add(m.ctx, -1)
m.requestDuration.Record(m.ctx, duration.Milliseconds())
}()
next.ServeHTTP(interceptor, r)
})
}
type roundTripperFunc func(*http.Request) (*http.Response, error)
func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return f(r)
}
// RoundTripper wraps an http.RoundTripper with backend duration metrics.
func (m *Metrics) RoundTripper(next http.RoundTripper) http.RoundTripper {
return roundTripperFunc(func(req *http.Request) (*http.Response, error) {
start := time.Now()
res, err := next.RoundTrip(req)
duration := time.Since(start)
m.backendDuration.Record(m.ctx, duration.Milliseconds())
return res, err
})
}
// AddMapping records that a domain mapping was added.
func (m *Metrics) AddMapping(mapping proxy.Mapping) {
m.mappingsMux.Lock()
defer m.mappingsMux.Unlock()
newPathCount := len(mapping.Paths)
oldPathCount, exists := m.mappingPaths[mapping.Host]
if !exists {
m.configuredDomains.Add(m.ctx, 1)
}
pathDelta := newPathCount - oldPathCount
if pathDelta != 0 {
m.totalPaths.Add(m.ctx, int64(pathDelta))
}
m.mappingPaths[mapping.Host] = newPathCount
}
// RemoveMapping records that a domain mapping was removed.
func (m *Metrics) RemoveMapping(mapping proxy.Mapping) {
m.mappingsMux.Lock()
defer m.mappingsMux.Unlock()
oldPathCount, exists := m.mappingPaths[mapping.Host]
if !exists {
return
}
m.configuredDomains.Add(m.ctx, -1)
m.totalPaths.Add(m.ctx, -int64(oldPathCount))
delete(m.mappingPaths, mapping.Host)
}
// RecordCertificateIssuance records the duration of a certificate issuance.
func (m *Metrics) RecordCertificateIssuance(duration time.Duration) {
m.certificateIssueDuration.Record(m.ctx, duration.Milliseconds())
}
// L4ServiceAdded increments the L4 service gauge for the given mode.
func (m *Metrics) L4ServiceAdded(mode types.ServiceMode) {
m.l4Services.Add(m.ctx, 1, metric.WithAttributes(attribute.String("mode", string(mode))))
}
// L4ServiceRemoved decrements the L4 service gauge for the given mode.
func (m *Metrics) L4ServiceRemoved(mode types.ServiceMode) {
m.l4Services.Add(m.ctx, -1, metric.WithAttributes(attribute.String("mode", string(mode))))
}
// TCPRelayStarted records a new TCP relay connection starting.
func (m *Metrics) TCPRelayStarted(accountID types.AccountID) {
acct := attribute.String("account_id", string(accountID))
m.tcpActiveConns.Add(m.ctx, 1, metric.WithAttributes(acct))
m.tcpConnsTotal.Add(m.ctx, 1, metric.WithAttributes(acct, attribute.String("result", "success")))
}
// TCPRelayEnded records a TCP relay connection ending and accumulates bytes and duration.
func (m *Metrics) TCPRelayEnded(accountID types.AccountID, duration time.Duration, srcToDst, dstToSrc int64) {
acct := attribute.String("account_id", string(accountID))
m.tcpActiveConns.Add(m.ctx, -1, metric.WithAttributes(acct))
m.tcpConnDuration.Record(m.ctx, duration.Milliseconds(), metric.WithAttributes(acct))
m.tcpBytesTotal.Add(m.ctx, srcToDst, metric.WithAttributes(attribute.String("direction", "client_to_backend")))
m.tcpBytesTotal.Add(m.ctx, dstToSrc, metric.WithAttributes(attribute.String("direction", "backend_to_client")))
}
// TCPRelayDialError records a dial failure for a TCP relay.
func (m *Metrics) TCPRelayDialError(accountID types.AccountID) {
m.tcpConnsTotal.Add(m.ctx, 1, metric.WithAttributes(
attribute.String("account_id", string(accountID)),
attribute.String("result", "dial_error"),
))
}
// TCPRelayRejected records a rejected TCP relay (semaphore full).
func (m *Metrics) TCPRelayRejected(accountID types.AccountID) {
m.tcpConnsTotal.Add(m.ctx, 1, metric.WithAttributes(
attribute.String("account_id", string(accountID)),
attribute.String("result", "rejected"),
))
}
// UDPSessionStarted records a new UDP session starting.
func (m *Metrics) UDPSessionStarted(accountID types.AccountID) {
acct := attribute.String("account_id", string(accountID))
m.udpActiveSess.Add(m.ctx, 1, metric.WithAttributes(acct))
m.udpSessionsTotal.Add(m.ctx, 1, metric.WithAttributes(acct, attribute.String("result", "success")))
}
// UDPSessionEnded records a UDP session ending.
func (m *Metrics) UDPSessionEnded(accountID types.AccountID) {
m.udpActiveSess.Add(m.ctx, -1, metric.WithAttributes(attribute.String("account_id", string(accountID))))
}
// UDPSessionDialError records a dial failure for a UDP session.
func (m *Metrics) UDPSessionDialError(accountID types.AccountID) {
m.udpSessionsTotal.Add(m.ctx, 1, metric.WithAttributes(
attribute.String("account_id", string(accountID)),
attribute.String("result", "dial_error"),
))
}
// UDPSessionRejected records a rejected UDP session (limit or rate limited).
func (m *Metrics) UDPSessionRejected(accountID types.AccountID) {
m.udpSessionsTotal.Add(m.ctx, 1, metric.WithAttributes(
attribute.String("account_id", string(accountID)),
attribute.String("result", "rejected"),
))
}
// UDPPacketRelayed records a packet relayed in the given direction with its size in bytes.
func (m *Metrics) UDPPacketRelayed(direction types.RelayDirection, bytes int) {
dir := attribute.String("direction", string(direction))
m.udpPacketsTotal.Add(m.ctx, 1, metric.WithAttributes(dir))
m.udpBytesTotal.Add(m.ctx, int64(bytes), metric.WithAttributes(dir))
}