mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-21 08:09:55 +00:00
250 lines
6.9 KiB
Go
250 lines
6.9 KiB
Go
package metrics
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"maps"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
const (
|
|
maxSampleAge = 5 * 24 * time.Hour // drop samples older than 5 days
|
|
maxBufferSize = 5 * 1024 * 1024 // drop oldest samples when estimated size exceeds 5 MB
|
|
// estimatedSampleSize is a rough per-sample memory estimate (measurement + tags + fields + timestamp)
|
|
estimatedSampleSize = 256
|
|
)
|
|
|
|
// influxSample is a single InfluxDB line protocol entry.
|
|
type influxSample struct {
|
|
measurement string
|
|
tags string
|
|
fields map[string]float64
|
|
timestamp time.Time
|
|
}
|
|
|
|
// influxDBMetrics collects metric events as timestamped samples.
|
|
// Each event is recorded with its exact timestamp, pushed once, then cleared.
|
|
type influxDBMetrics struct {
|
|
mu sync.Mutex
|
|
samples []influxSample
|
|
}
|
|
|
|
func newInfluxDBMetrics() metricsImplementation {
|
|
return &influxDBMetrics{}
|
|
}
|
|
func (m *influxDBMetrics) RecordConnectionStages(
|
|
_ context.Context,
|
|
agentInfo AgentInfo,
|
|
connectionPairID string,
|
|
connectionType ConnectionType,
|
|
isReconnection bool,
|
|
timestamps ConnectionStageTimestamps,
|
|
) {
|
|
var signalingReceivedToConnection, connectionToWgHandshake, totalDuration float64
|
|
|
|
if !timestamps.SignalingReceived.IsZero() && !timestamps.ConnectionReady.IsZero() {
|
|
signalingReceivedToConnection = timestamps.ConnectionReady.Sub(timestamps.SignalingReceived).Seconds()
|
|
}
|
|
|
|
if !timestamps.ConnectionReady.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
|
|
connectionToWgHandshake = timestamps.WgHandshakeSuccess.Sub(timestamps.ConnectionReady).Seconds()
|
|
}
|
|
|
|
if !timestamps.SignalingReceived.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
|
|
totalDuration = timestamps.WgHandshakeSuccess.Sub(timestamps.SignalingReceived).Seconds()
|
|
}
|
|
|
|
attemptType := "initial"
|
|
if isReconnection {
|
|
attemptType = "reconnection"
|
|
}
|
|
|
|
connTypeStr := connectionType.String()
|
|
tags := fmt.Sprintf("deployment_type=%s,connection_type=%s,attempt_type=%s,version=%s,os=%s,arch=%s,peer_id=%s,connection_pair_id=%s",
|
|
agentInfo.DeploymentType.String(),
|
|
connTypeStr,
|
|
attemptType,
|
|
agentInfo.Version,
|
|
agentInfo.OS,
|
|
agentInfo.Arch,
|
|
agentInfo.peerID,
|
|
connectionPairID,
|
|
)
|
|
|
|
now := time.Now()
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
m.samples = append(m.samples, influxSample{
|
|
measurement: "netbird_peer_connection",
|
|
tags: tags,
|
|
fields: map[string]float64{
|
|
"signaling_to_connection_seconds": signalingReceivedToConnection,
|
|
"connection_to_wg_handshake_seconds": connectionToWgHandshake,
|
|
"total_seconds": totalDuration,
|
|
},
|
|
timestamp: now,
|
|
})
|
|
m.trimLocked()
|
|
|
|
log.Tracef("peer connection metrics [%s, %s, %s]: signalingReceived→connection: %.3fs, connection→wg_handshake: %.3fs, total: %.3fs",
|
|
agentInfo.DeploymentType.String(), connTypeStr, attemptType, signalingReceivedToConnection, connectionToWgHandshake, totalDuration)
|
|
}
|
|
|
|
func (m *influxDBMetrics) RecordSyncDuration(_ context.Context, agentInfo AgentInfo, duration time.Duration) {
|
|
tags := fmt.Sprintf("deployment_type=%s,version=%s,os=%s,arch=%s,peer_id=%s",
|
|
agentInfo.DeploymentType.String(),
|
|
agentInfo.Version,
|
|
agentInfo.OS,
|
|
agentInfo.Arch,
|
|
agentInfo.peerID,
|
|
)
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
m.samples = append(m.samples, influxSample{
|
|
measurement: "netbird_sync",
|
|
tags: tags,
|
|
fields: map[string]float64{
|
|
"duration_seconds": duration.Seconds(),
|
|
},
|
|
timestamp: time.Now(),
|
|
})
|
|
m.trimLocked()
|
|
}
|
|
|
|
func (m *influxDBMetrics) RecordVNCSessionTick(_ context.Context, agentInfo AgentInfo, tick VNCSessionTick) {
|
|
tags := fmt.Sprintf("deployment_type=%s,version=%s,os=%s,arch=%s,peer_id=%s",
|
|
agentInfo.DeploymentType.String(),
|
|
agentInfo.Version,
|
|
agentInfo.OS,
|
|
agentInfo.Arch,
|
|
agentInfo.peerID,
|
|
)
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
m.samples = append(m.samples, influxSample{
|
|
measurement: "netbird_vnc_traffic",
|
|
tags: tags,
|
|
fields: map[string]float64{
|
|
"period_seconds": tick.Period.Seconds(),
|
|
"bytes_out": float64(tick.BytesOut),
|
|
"writes": float64(tick.Writes),
|
|
"fbus": float64(tick.FBUs),
|
|
"max_fbu_bytes": float64(tick.MaxFBUBytes),
|
|
"max_fbu_rects": float64(tick.MaxFBURects),
|
|
"max_write_bytes": float64(tick.MaxWriteBytes),
|
|
"write_time_seconds": float64(tick.WriteNanos) / 1e9,
|
|
},
|
|
timestamp: time.Now(),
|
|
})
|
|
m.trimLocked()
|
|
}
|
|
|
|
func (m *influxDBMetrics) RecordLoginDuration(_ context.Context, agentInfo AgentInfo, duration time.Duration, success bool) {
|
|
result := "success"
|
|
if !success {
|
|
result = "failure"
|
|
}
|
|
|
|
tags := fmt.Sprintf("deployment_type=%s,result=%s,version=%s,os=%s,arch=%s,peer_id=%s",
|
|
agentInfo.DeploymentType.String(),
|
|
result,
|
|
agentInfo.Version,
|
|
agentInfo.OS,
|
|
agentInfo.Arch,
|
|
agentInfo.peerID,
|
|
)
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
m.samples = append(m.samples, influxSample{
|
|
measurement: "netbird_login",
|
|
tags: tags,
|
|
fields: map[string]float64{
|
|
"duration_seconds": duration.Seconds(),
|
|
},
|
|
timestamp: time.Now(),
|
|
})
|
|
m.trimLocked()
|
|
|
|
log.Tracef("login metrics [%s, %s]: duration=%.3fs", agentInfo.DeploymentType.String(), result, duration.Seconds())
|
|
}
|
|
|
|
// Export writes pending samples in InfluxDB line protocol format.
|
|
// Format: measurement,tag=val,tag=val field=val,field=val timestamp_ns
|
|
func (m *influxDBMetrics) Export(w io.Writer) error {
|
|
m.mu.Lock()
|
|
samples := make([]influxSample, len(m.samples))
|
|
copy(samples, m.samples)
|
|
m.mu.Unlock()
|
|
|
|
for _, s := range samples {
|
|
if _, err := fmt.Fprintf(w, "%s,%s ", s.measurement, s.tags); err != nil {
|
|
return err
|
|
}
|
|
|
|
sortedKeys := slices.Sorted(maps.Keys(s.fields))
|
|
first := true
|
|
for _, k := range sortedKeys {
|
|
if !first {
|
|
if _, err := fmt.Fprint(w, ","); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if _, err := fmt.Fprintf(w, "%s=%g", k, s.fields[k]); err != nil {
|
|
return err
|
|
}
|
|
first = false
|
|
}
|
|
|
|
if _, err := fmt.Fprintf(w, " %d\n", s.timestamp.UnixNano()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Reset clears pending samples after a successful push
|
|
func (m *influxDBMetrics) Reset() {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.samples = m.samples[:0]
|
|
}
|
|
|
|
// trimLocked removes samples that exceed age or size limits.
|
|
// Must be called with m.mu held.
|
|
func (m *influxDBMetrics) trimLocked() {
|
|
now := time.Now()
|
|
|
|
// drop samples older than maxSampleAge
|
|
cutoff := 0
|
|
for cutoff < len(m.samples) && now.Sub(m.samples[cutoff].timestamp) > maxSampleAge {
|
|
cutoff++
|
|
}
|
|
if cutoff > 0 {
|
|
copy(m.samples, m.samples[cutoff:])
|
|
m.samples = m.samples[:len(m.samples)-cutoff]
|
|
log.Debugf("influxdb metrics: dropped %d samples older than %s", cutoff, maxSampleAge)
|
|
}
|
|
|
|
// drop oldest samples if estimated size exceeds maxBufferSize
|
|
maxSamples := maxBufferSize / estimatedSampleSize
|
|
if len(m.samples) > maxSamples {
|
|
drop := len(m.samples) - maxSamples
|
|
copy(m.samples, m.samples[drop:])
|
|
m.samples = m.samples[:maxSamples]
|
|
log.Debugf("influxdb metrics: dropped %d oldest samples to stay under %d MB size limit", drop, maxBufferSize/(1024*1024))
|
|
}
|
|
}
|