switch proxy to use opentelemetry

This commit is contained in:
pascal
2026-03-07 11:16:40 +01:00
parent 5c20f13c48
commit c2fec57c0f
3 changed files with 121 additions and 93 deletions

View File

@@ -1,64 +1,90 @@
package metrics package metrics
import ( import (
"context"
"net/http" "net/http"
"strconv"
"time" "time"
"github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/metric"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/netbirdio/netbird/proxy/internal/proxy" "github.com/netbirdio/netbird/proxy/internal/proxy"
"github.com/netbirdio/netbird/proxy/internal/responsewriter" "github.com/netbirdio/netbird/proxy/internal/responsewriter"
) )
type Metrics struct { type Metrics struct {
requestsTotal prometheus.Counter ctx context.Context
activeRequests prometheus.Gauge requestsTotal metric.Int64Counter
configuredDomains prometheus.Gauge activeRequests metric.Int64UpDownCounter
pathsPerDomain *prometheus.GaugeVec configuredDomains metric.Int64UpDownCounter
requestDuration *prometheus.HistogramVec totalPaths metric.Int64UpDownCounter
backendDuration *prometheus.HistogramVec requestDuration metric.Int64Histogram
backendDuration metric.Int64Histogram
} }
func New(reg prometheus.Registerer) *Metrics { func New(ctx context.Context, meter metric.Meter) (*Metrics, error) {
promFactory := promauto.With(reg) requestsTotal, err := meter.Int64Counter(
return &Metrics{ "proxy.http.request.counter",
requestsTotal: promFactory.NewCounter(prometheus.CounterOpts{ metric.WithUnit("1"),
Name: "netbird_proxy_requests_total", metric.WithDescription("Total number of requests made to the netbird proxy"),
Help: "Total number of requests made to the netbird proxy", )
}), if err != nil {
activeRequests: promFactory.NewGauge(prometheus.GaugeOpts{ return nil, err
Name: "netbird_proxy_active_requests_count",
Help: "Current in-flight requests handled by the netbird proxy",
}),
configuredDomains: promFactory.NewGauge(prometheus.GaugeOpts{
Name: "netbird_proxy_domains_count",
Help: "Current number of domains configured on the netbird proxy",
}),
pathsPerDomain: promFactory.NewGaugeVec(
prometheus.GaugeOpts{
Name: "netbird_proxy_paths_count",
Help: "Current number of paths configured on the netbird proxy labelled by domain",
},
[]string{"domain"},
),
requestDuration: promFactory.NewHistogramVec(
prometheus.HistogramOpts{
Name: "netbird_proxy_request_duration_seconds",
Help: "Duration of requests made to the netbird proxy",
Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10},
},
[]string{"status", "size", "method", "host", "path"},
),
backendDuration: promFactory.NewHistogramVec(prometheus.HistogramOpts{
Name: "netbird_proxy_backend_duration_seconds",
Help: "Duration of peer round trip time from the netbird proxy",
Buckets: []float64{0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10},
},
[]string{"status", "size", "method", "host", "path"},
),
} }
activeRequests, err := meter.Int64UpDownCounter(
"proxy.http.active_requests",
metric.WithUnit("1"),
metric.WithDescription("Current in-flight requests handled by the netbird proxy"),
)
if err != nil {
return nil, err
}
configuredDomains, err := meter.Int64UpDownCounter(
"proxy.domains.count",
metric.WithUnit("1"),
metric.WithDescription("Current number of domains configured on the netbird proxy"),
)
if err != nil {
return nil, err
}
totalPaths, err := meter.Int64UpDownCounter(
"proxy.paths.count",
metric.WithUnit("1"),
metric.WithDescription("Total number of paths configured on the netbird proxy"),
)
if err != nil {
return nil, err
}
requestDuration, err := meter.Int64Histogram(
"proxy.http.request.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of requests made to the netbird proxy"),
)
if err != nil {
return nil, err
}
backendDuration, err := meter.Int64Histogram(
"proxy.backend.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of peer round trip time from the netbird proxy"),
)
if err != nil {
return nil, err
}
return &Metrics{
ctx: ctx,
requestsTotal: requestsTotal,
activeRequests: activeRequests,
configuredDomains: configuredDomains,
totalPaths: totalPaths,
requestDuration: requestDuration,
backendDuration: backendDuration,
}, nil
} }
type responseInterceptor struct { type responseInterceptor struct {
@@ -80,8 +106,8 @@ func (w *responseInterceptor) Write(b []byte) (int, error) {
func (m *Metrics) Middleware(next http.Handler) http.Handler { func (m *Metrics) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
m.requestsTotal.Inc() m.requestsTotal.Add(m.ctx, 1)
m.activeRequests.Inc() m.activeRequests.Add(m.ctx, 1)
interceptor := &responseInterceptor{PassthroughWriter: responsewriter.New(w)} interceptor := &responseInterceptor{PassthroughWriter: responsewriter.New(w)}
@@ -89,14 +115,8 @@ func (m *Metrics) Middleware(next http.Handler) http.Handler {
next.ServeHTTP(interceptor, r) next.ServeHTTP(interceptor, r)
duration := time.Since(start) duration := time.Since(start)
m.activeRequests.Desc() m.activeRequests.Add(m.ctx, -1)
m.requestDuration.With(prometheus.Labels{ m.requestDuration.Record(m.ctx, duration.Milliseconds())
"status": strconv.Itoa(interceptor.status),
"size": strconv.Itoa(interceptor.size),
"method": r.Method,
"host": r.Host,
"path": r.URL.Path,
}).Observe(duration.Seconds())
}) })
} }
@@ -108,44 +128,22 @@ func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
func (m *Metrics) RoundTripper(next http.RoundTripper) http.RoundTripper { func (m *Metrics) RoundTripper(next http.RoundTripper) http.RoundTripper {
return roundTripperFunc(func(req *http.Request) (*http.Response, error) { return roundTripperFunc(func(req *http.Request) (*http.Response, error) {
labels := prometheus.Labels{
"method": req.Method,
"host": req.Host,
// Fill potentially empty labels with default values to avoid cardinality issues.
"path": "/",
"status": "0",
"size": "0",
}
if req.URL != nil {
labels["path"] = req.URL.Path
}
start := time.Now() start := time.Now()
res, err := next.RoundTrip(req) res, err := next.RoundTrip(req)
duration := time.Since(start) duration := time.Since(start)
// Not all labels will be available if there was an error. m.backendDuration.Record(m.ctx, duration.Milliseconds())
if res != nil {
labels["status"] = strconv.Itoa(res.StatusCode)
labels["size"] = strconv.Itoa(int(res.ContentLength))
}
m.backendDuration.With(labels).Observe(duration.Seconds())
return res, err return res, err
}) })
} }
func (m *Metrics) AddMapping(mapping proxy.Mapping) { func (m *Metrics) AddMapping(mapping proxy.Mapping) {
m.configuredDomains.Inc() m.configuredDomains.Add(m.ctx, 1)
m.pathsPerDomain.With(prometheus.Labels{ m.totalPaths.Add(m.ctx, int64(len(mapping.Paths)))
"domain": mapping.Host,
}).Set(float64(len(mapping.Paths)))
} }
func (m *Metrics) RemoveMapping(mapping proxy.Mapping) { func (m *Metrics) RemoveMapping(mapping proxy.Mapping) {
m.configuredDomains.Dec() m.configuredDomains.Add(m.ctx, -1)
m.pathsPerDomain.With(prometheus.Labels{ m.totalPaths.Add(m.ctx, -int64(len(mapping.Paths)))
"domain": mapping.Host,
}).Set(0)
} }

View File

@@ -1,13 +1,17 @@
package metrics_test package metrics_test
import ( import (
"context"
"net/http" "net/http"
"net/url" "net/url"
"reflect"
"testing" "testing"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"
"github.com/netbirdio/netbird/proxy/internal/metrics" "github.com/netbirdio/netbird/proxy/internal/metrics"
"github.com/prometheus/client_golang/prometheus"
) )
type testRoundTripper struct { type testRoundTripper struct {
@@ -47,7 +51,19 @@ func TestMetrics_RoundTripper(t *testing.T) {
}, },
} }
m := metrics.New(prometheus.NewRegistry()) exporter, err := prometheus.New()
if err != nil {
t.Fatalf("create prometheus exporter: %v", err)
}
provider := metric.NewMeterProvider(metric.WithReader(exporter))
pkg := reflect.TypeOf(metrics.Metrics{}).PkgPath()
meter := provider.Meter(pkg)
m, err := metrics.New(context.Background(), meter)
if err != nil {
t.Fatalf("create metrics: %v", err)
}
for name, test := range tests { for name, test := range tests {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {

View File

@@ -19,14 +19,17 @@ import (
"net/netip" "net/netip"
"net/url" "net/url"
"path/filepath" "path/filepath"
"reflect"
"sync" "sync"
"time" "time"
"github.com/cenkalti/backoff/v4" "github.com/cenkalti/backoff/v4"
"github.com/pires/go-proxyproto" "github.com/pires/go-proxyproto"
"github.com/prometheus/client_golang/prometheus" prometheus2 "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
@@ -42,7 +45,7 @@ import (
proxygrpc "github.com/netbirdio/netbird/proxy/internal/grpc" proxygrpc "github.com/netbirdio/netbird/proxy/internal/grpc"
"github.com/netbirdio/netbird/proxy/internal/health" "github.com/netbirdio/netbird/proxy/internal/health"
"github.com/netbirdio/netbird/proxy/internal/k8s" "github.com/netbirdio/netbird/proxy/internal/k8s"
"github.com/netbirdio/netbird/proxy/internal/metrics" proxymetrics "github.com/netbirdio/netbird/proxy/internal/metrics"
"github.com/netbirdio/netbird/proxy/internal/proxy" "github.com/netbirdio/netbird/proxy/internal/proxy"
"github.com/netbirdio/netbird/proxy/internal/roundtrip" "github.com/netbirdio/netbird/proxy/internal/roundtrip"
"github.com/netbirdio/netbird/proxy/internal/types" "github.com/netbirdio/netbird/proxy/internal/types"
@@ -63,7 +66,7 @@ type Server struct {
debug *http.Server debug *http.Server
healthServer *health.Server healthServer *health.Server
healthChecker *health.Checker healthChecker *health.Checker
meter *metrics.Metrics meter *proxymetrics.Metrics
// hijackTracker tracks hijacked connections (e.g. WebSocket upgrades) // hijackTracker tracks hijacked connections (e.g. WebSocket upgrades)
// so they can be closed during graceful shutdown, since http.Server.Shutdown // so they can be closed during graceful shutdown, since http.Server.Shutdown
@@ -152,8 +155,19 @@ func (s *Server) NotifyCertificateIssued(ctx context.Context, accountID, service
func (s *Server) ListenAndServe(ctx context.Context, addr string) (err error) { func (s *Server) ListenAndServe(ctx context.Context, addr string) (err error) {
s.initDefaults() s.initDefaults()
reg := prometheus.NewRegistry() exporter, err := prometheus.New()
s.meter = metrics.New(reg) if err != nil {
return fmt.Errorf("create prometheus exporter: %w", err)
}
provider := metric.NewMeterProvider(metric.WithReader(exporter))
pkg := reflect.TypeOf(Server{}).PkgPath()
meter := provider.Meter(pkg)
s.meter, err = proxymetrics.New(ctx, meter)
if err != nil {
return fmt.Errorf("create metrics: %w", err)
}
mgmtConn, err := s.dialManagement() mgmtConn, err := s.dialManagement()
if err != nil { if err != nil {
@@ -193,7 +207,7 @@ func (s *Server) ListenAndServe(ctx context.Context, addr string) (err error) {
s.startDebugEndpoint() s.startDebugEndpoint()
if err := s.startHealthServer(reg); err != nil { if err := s.startHealthServer(); err != nil {
return err return err
} }
@@ -284,12 +298,12 @@ func (s *Server) startDebugEndpoint() {
} }
// startHealthServer launches the health probe and metrics server. // startHealthServer launches the health probe and metrics server.
func (s *Server) startHealthServer(reg *prometheus.Registry) error { func (s *Server) startHealthServer() error {
healthAddr := s.HealthAddress healthAddr := s.HealthAddress
if healthAddr == "" { if healthAddr == "" {
healthAddr = defaultHealthAddr healthAddr = defaultHealthAddr
} }
s.healthServer = health.NewServer(healthAddr, s.healthChecker, s.Logger, promhttp.HandlerFor(reg, promhttp.HandlerOpts{})) s.healthServer = health.NewServer(healthAddr, s.healthChecker, s.Logger, promhttp.HandlerFor(prometheus2.DefaultGatherer, promhttp.HandlerOpts{EnableOpenMetrics: true}))
healthListener, err := net.Listen("tcp", healthAddr) healthListener, err := net.Listen("tcp", healthAddr)
if err != nil { if err != nil {
return fmt.Errorf("health probe server listen on %s: %w", healthAddr, err) return fmt.Errorf("health probe server listen on %s: %w", healthAddr, err)