Files
netbird/management/server/telemetry/grpc_metrics.go
2026-03-05 22:10:45 +01:00

261 lines
9.8 KiB
Go

package telemetry
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
const AccountIDLabel = "account_id"
const HighLatencyThreshold = time.Second * 7
// GRPCMetrics are gRPC server metrics
type GRPCMetrics struct {
meter metric.Meter
syncRequestsCounter metric.Int64Counter
syncRequestsBlockedCounter metric.Int64Counter
loginRequestsCounter metric.Int64Counter
loginRequestsBlockedCounter metric.Int64Counter
loginRequestHighLatencyCounter metric.Int64Counter
getKeyRequestsCounter metric.Int64Counter
activeStreamsGauge metric.Int64ObservableGauge
syncRequestDuration metric.Int64Histogram
syncRequestDurationP95ByAccount metric.Int64Histogram
loginRequestDuration metric.Int64Histogram
loginRequestDurationP95ByAccount metric.Int64Histogram
channelQueueLength metric.Int64Histogram
ctx context.Context
// Per-account aggregation
syncDurationAggregator *AccountDurationAggregator
loginDurationAggregator *AccountDurationAggregator
}
// NewGRPCMetrics creates new GRPCMetrics struct and registers common metrics of the gRPC server
func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, error) {
syncRequestsCounter, err := meter.Int64Counter("management.grpc.sync.request.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of sync gRPC requests from the peers to establish a connection and receive network map updates (update channel)"),
)
if err != nil {
return nil, err
}
syncRequestsBlockedCounter, err := meter.Int64Counter("management.grpc.sync.request.blocked.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of sync gRPC requests from blocked peers"),
)
if err != nil {
return nil, err
}
loginRequestsCounter, err := meter.Int64Counter("management.grpc.login.request.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of login gRPC requests from the peers to authenticate and receive initial configuration and relay credentials"),
)
if err != nil {
return nil, err
}
loginRequestsBlockedCounter, err := meter.Int64Counter("management.grpc.login.request.blocked.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of login gRPC requests from blocked peers"),
)
if err != nil {
return nil, err
}
loginRequestHighLatencyCounter, err := meter.Int64Counter("management.grpc.login.request.high.latency.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of login gRPC requests from the peers that took longer than the threshold to authenticate and receive initial configuration and relay credentials"),
)
if err != nil {
return nil, err
}
getKeyRequestsCounter, err := meter.Int64Counter("management.grpc.key.request.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of key gRPC requests from the peers to get the server's public WireGuard key"),
)
if err != nil {
return nil, err
}
activeStreamsGauge, err := meter.Int64ObservableGauge("management.grpc.connected.streams",
metric.WithUnit("1"),
metric.WithDescription("Number of active peer streams connected to the gRPC server"),
)
if err != nil {
return nil, err
}
syncRequestDuration, err := meter.Int64Histogram("management.grpc.sync.request.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of the sync gRPC requests from the peers to establish a connection and receive network map updates (update channel)"),
)
if err != nil {
return nil, err
}
syncRequestDurationP95ByAccount, err := meter.Int64Histogram("management.grpc.sync.request.duration.p95.by.account.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("P95 duration of sync requests aggregated per account - each data point represents one account's P95"),
)
if err != nil {
return nil, err
}
loginRequestDuration, err := meter.Int64Histogram("management.grpc.login.request.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of the login gRPC requests from the peers to authenticate and receive initial configuration and relay credentials"),
)
if err != nil {
return nil, err
}
loginRequestDurationP95ByAccount, err := meter.Int64Histogram("management.grpc.login.request.duration.p95.by.account.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("P95 duration of login requests aggregated per account - each data point represents one account's P95"),
)
if err != nil {
return nil, err
}
// We use histogram here as we have multiple channel at the same time and we want to see a slice at any given time
// Then we should be able to extract min, manx, mean and the percentiles.
// TODO(yury): This needs custom bucketing as we are interested in the values from 0 to server.channelBufferSize (100)
channelQueue, err := meter.Int64Histogram(
"management.grpc.updatechannel.queue",
metric.WithDescription("Number of update messages piling up in the update channel queue"),
metric.WithUnit("length"),
)
if err != nil {
return nil, err
}
syncDurationAggregator := NewAccountDurationAggregator(ctx, 60*time.Second, 5*time.Minute)
loginDurationAggregator := NewAccountDurationAggregator(ctx, 60*time.Second, 5*time.Minute)
grpcMetrics := &GRPCMetrics{
meter: meter,
syncRequestsCounter: syncRequestsCounter,
syncRequestsBlockedCounter: syncRequestsBlockedCounter,
loginRequestsCounter: loginRequestsCounter,
loginRequestsBlockedCounter: loginRequestsBlockedCounter,
loginRequestHighLatencyCounter: loginRequestHighLatencyCounter,
getKeyRequestsCounter: getKeyRequestsCounter,
activeStreamsGauge: activeStreamsGauge,
syncRequestDuration: syncRequestDuration,
syncRequestDurationP95ByAccount: syncRequestDurationP95ByAccount,
loginRequestDuration: loginRequestDuration,
loginRequestDurationP95ByAccount: loginRequestDurationP95ByAccount,
channelQueueLength: channelQueue,
ctx: ctx,
syncDurationAggregator: syncDurationAggregator,
loginDurationAggregator: loginDurationAggregator,
}
go grpcMetrics.startSyncP95Flusher()
go grpcMetrics.startLoginP95Flusher()
return grpcMetrics, err
}
// CountSyncRequest counts the number of gRPC sync requests coming to the gRPC API
func (grpcMetrics *GRPCMetrics) CountSyncRequest() {
grpcMetrics.syncRequestsCounter.Add(grpcMetrics.ctx, 1)
}
// CountSyncRequestBlocked counts the number of gRPC sync requests from blocked peers
func (grpcMetrics *GRPCMetrics) CountSyncRequestBlocked() {
grpcMetrics.syncRequestsBlockedCounter.Add(grpcMetrics.ctx, 1)
}
// CountGetKeyRequest counts the number of gRPC get server key requests coming to the gRPC API
func (grpcMetrics *GRPCMetrics) CountGetKeyRequest() {
grpcMetrics.getKeyRequestsCounter.Add(grpcMetrics.ctx, 1)
}
// CountLoginRequest counts the number of gRPC login requests coming to the gRPC API
func (grpcMetrics *GRPCMetrics) CountLoginRequest() {
grpcMetrics.loginRequestsCounter.Add(grpcMetrics.ctx, 1)
}
// CountLoginRequestBlocked counts the number of gRPC login requests from blocked peers
func (grpcMetrics *GRPCMetrics) CountLoginRequestBlocked() {
grpcMetrics.loginRequestsBlockedCounter.Add(grpcMetrics.ctx, 1)
}
// CountLoginRequestDuration counts the duration of the login gRPC requests
func (grpcMetrics *GRPCMetrics) CountLoginRequestDuration(duration time.Duration, accountID string) {
grpcMetrics.loginRequestDuration.Record(grpcMetrics.ctx, duration.Milliseconds())
grpcMetrics.loginDurationAggregator.Record(accountID, duration)
if duration > HighLatencyThreshold {
grpcMetrics.loginRequestHighLatencyCounter.Add(grpcMetrics.ctx, 1, metric.WithAttributes(attribute.String(AccountIDLabel, accountID)))
}
}
// CountSyncRequestDuration counts the duration of the sync gRPC requests
func (grpcMetrics *GRPCMetrics) CountSyncRequestDuration(duration time.Duration, accountID string) {
grpcMetrics.syncRequestDuration.Record(grpcMetrics.ctx, duration.Milliseconds())
grpcMetrics.syncDurationAggregator.Record(accountID, duration)
}
// startSyncP95Flusher periodically flushes per-account sync P95 values to the histogram
func (grpcMetrics *GRPCMetrics) startSyncP95Flusher() {
ticker := time.NewTicker(grpcMetrics.syncDurationAggregator.FlushInterval)
defer ticker.Stop()
for {
select {
case <-grpcMetrics.ctx.Done():
return
case <-ticker.C:
p95s := grpcMetrics.syncDurationAggregator.FlushAndGetP95s()
for _, p95 := range p95s {
grpcMetrics.syncRequestDurationP95ByAccount.Record(grpcMetrics.ctx, p95)
}
}
}
}
// startLoginP95Flusher periodically flushes per-account login P95 values to the histogram
func (grpcMetrics *GRPCMetrics) startLoginP95Flusher() {
ticker := time.NewTicker(grpcMetrics.loginDurationAggregator.FlushInterval)
defer ticker.Stop()
for {
select {
case <-grpcMetrics.ctx.Done():
return
case <-ticker.C:
p95s := grpcMetrics.loginDurationAggregator.FlushAndGetP95s()
for _, p95 := range p95s {
grpcMetrics.loginRequestDurationP95ByAccount.Record(grpcMetrics.ctx, p95)
}
}
}
}
// RegisterConnectedStreams registers a function that collects number of active streams and feeds it to the metrics gauge.
func (grpcMetrics *GRPCMetrics) RegisterConnectedStreams(producer func() int64) error {
_, err := grpcMetrics.meter.RegisterCallback(
func(ctx context.Context, observer metric.Observer) error {
observer.ObserveInt64(grpcMetrics.activeStreamsGauge, producer())
return nil
},
grpcMetrics.activeStreamsGauge,
)
return err
}
// UpdateChannelQueueLength update the histogram that keep distribution of the update messages channel queue
func (metrics *GRPCMetrics) UpdateChannelQueueLength(length int) {
metrics.channelQueueLength.Record(metrics.ctx, int64(length))
}