mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-21 01:36:46 +00:00
Compare commits
14 Commits
feature/up
...
feature/cl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e276a40d9 | ||
|
|
3753bf7fc4 | ||
|
|
bec58b85b1 | ||
|
|
ca3e6d93d3 | ||
|
|
80abddb78a | ||
|
|
138e728427 | ||
|
|
e7283a8198 | ||
|
|
08295e5116 | ||
|
|
cbfde79dd8 | ||
|
|
5169129029 | ||
|
|
5f02a48737 | ||
|
|
bb377a2885 | ||
|
|
e3a5c44d37 | ||
|
|
c5eb5ba1c6 |
@@ -219,6 +219,11 @@ const (
|
|||||||
darwinStdoutLogPath = "/var/log/netbird.err.log"
|
darwinStdoutLogPath = "/var/log/netbird.err.log"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// MetricsExporter is an interface for exporting metrics
|
||||||
|
type MetricsExporter interface {
|
||||||
|
Export(w io.Writer) error
|
||||||
|
}
|
||||||
|
|
||||||
type BundleGenerator struct {
|
type BundleGenerator struct {
|
||||||
anonymizer *anonymize.Anonymizer
|
anonymizer *anonymize.Anonymizer
|
||||||
|
|
||||||
@@ -229,6 +234,7 @@ type BundleGenerator struct {
|
|||||||
logPath string
|
logPath string
|
||||||
cpuProfile []byte
|
cpuProfile []byte
|
||||||
refreshStatus func() // Optional callback to refresh status before bundle generation
|
refreshStatus func() // Optional callback to refresh status before bundle generation
|
||||||
|
clientMetrics MetricsExporter
|
||||||
|
|
||||||
anonymize bool
|
anonymize bool
|
||||||
includeSystemInfo bool
|
includeSystemInfo bool
|
||||||
@@ -250,6 +256,7 @@ type GeneratorDependencies struct {
|
|||||||
LogPath string
|
LogPath string
|
||||||
CPUProfile []byte
|
CPUProfile []byte
|
||||||
RefreshStatus func() // Optional callback to refresh status before bundle generation
|
RefreshStatus func() // Optional callback to refresh status before bundle generation
|
||||||
|
ClientMetrics MetricsExporter
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBundleGenerator(deps GeneratorDependencies, cfg BundleConfig) *BundleGenerator {
|
func NewBundleGenerator(deps GeneratorDependencies, cfg BundleConfig) *BundleGenerator {
|
||||||
@@ -268,6 +275,7 @@ func NewBundleGenerator(deps GeneratorDependencies, cfg BundleConfig) *BundleGen
|
|||||||
logPath: deps.LogPath,
|
logPath: deps.LogPath,
|
||||||
cpuProfile: deps.CPUProfile,
|
cpuProfile: deps.CPUProfile,
|
||||||
refreshStatus: deps.RefreshStatus,
|
refreshStatus: deps.RefreshStatus,
|
||||||
|
clientMetrics: deps.ClientMetrics,
|
||||||
|
|
||||||
anonymize: cfg.Anonymize,
|
anonymize: cfg.Anonymize,
|
||||||
includeSystemInfo: cfg.IncludeSystemInfo,
|
includeSystemInfo: cfg.IncludeSystemInfo,
|
||||||
@@ -351,6 +359,10 @@ func (g *BundleGenerator) createArchive() error {
|
|||||||
log.Errorf("failed to add corrupted state files to debug bundle: %v", err)
|
log.Errorf("failed to add corrupted state files to debug bundle: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := g.addMetrics(); err != nil {
|
||||||
|
log.Errorf("failed to add metrics to debug bundle: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := g.addWgShow(); err != nil {
|
if err := g.addWgShow(); err != nil {
|
||||||
log.Errorf("failed to add wg show output: %v", err)
|
log.Errorf("failed to add wg show output: %v", err)
|
||||||
}
|
}
|
||||||
@@ -744,6 +756,25 @@ func (g *BundleGenerator) addCorruptedStateFiles() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *BundleGenerator) addMetrics() error {
|
||||||
|
if g.clientMetrics == nil {
|
||||||
|
log.Debugf("skipping metrics in debug bundle: no metrics collector")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
if err := g.clientMetrics.Export(&buf); err != nil {
|
||||||
|
return fmt.Errorf("export metrics: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := g.addFileToZip(&buf, "metrics.txt"); err != nil {
|
||||||
|
return fmt.Errorf("add metrics file to zip: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("added metrics to debug bundle")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (g *BundleGenerator) addLogfile() error {
|
func (g *BundleGenerator) addLogfile() error {
|
||||||
if g.logPath == "" {
|
if g.logPath == "" {
|
||||||
log.Debugf("skipping empty log file in debug bundle")
|
log.Debugf("skipping empty log file in debug bundle")
|
||||||
|
|||||||
@@ -36,6 +36,7 @@ import (
|
|||||||
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
|
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
|
||||||
"github.com/netbirdio/netbird/client/internal/dnsfwd"
|
"github.com/netbirdio/netbird/client/internal/dnsfwd"
|
||||||
"github.com/netbirdio/netbird/client/internal/ingressgw"
|
"github.com/netbirdio/netbird/client/internal/ingressgw"
|
||||||
|
"github.com/netbirdio/netbird/client/internal/metrics"
|
||||||
"github.com/netbirdio/netbird/client/internal/netflow"
|
"github.com/netbirdio/netbird/client/internal/netflow"
|
||||||
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
|
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||||
"github.com/netbirdio/netbird/client/internal/networkmonitor"
|
"github.com/netbirdio/netbird/client/internal/networkmonitor"
|
||||||
@@ -65,6 +66,7 @@ import (
|
|||||||
signal "github.com/netbirdio/netbird/shared/signal/client"
|
signal "github.com/netbirdio/netbird/shared/signal/client"
|
||||||
sProto "github.com/netbirdio/netbird/shared/signal/proto"
|
sProto "github.com/netbirdio/netbird/shared/signal/proto"
|
||||||
"github.com/netbirdio/netbird/util"
|
"github.com/netbirdio/netbird/util"
|
||||||
|
"github.com/netbirdio/netbird/version"
|
||||||
)
|
)
|
||||||
|
|
||||||
// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
|
// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
|
||||||
@@ -221,6 +223,9 @@ type Engine struct {
|
|||||||
|
|
||||||
probeStunTurn *relay.StunTurnProbe
|
probeStunTurn *relay.StunTurnProbe
|
||||||
|
|
||||||
|
// clientMetrics collects and pushes metrics
|
||||||
|
clientMetrics *metrics.ClientMetrics
|
||||||
|
|
||||||
jobExecutor *jobexec.Executor
|
jobExecutor *jobexec.Executor
|
||||||
jobExecutorWG sync.WaitGroup
|
jobExecutorWG sync.WaitGroup
|
||||||
}
|
}
|
||||||
@@ -248,6 +253,12 @@ func NewEngine(
|
|||||||
checks []*mgmProto.Checks,
|
checks []*mgmProto.Checks,
|
||||||
stateManager *statemanager.Manager,
|
stateManager *statemanager.Manager,
|
||||||
) *Engine {
|
) *Engine {
|
||||||
|
// Initialize metrics based on deployment type
|
||||||
|
var deploymentType metrics.DeploymentType
|
||||||
|
if mgmClient != nil {
|
||||||
|
deploymentType = metrics.DetermineDeploymentType(mgmClient.GetServerURL())
|
||||||
|
}
|
||||||
|
|
||||||
engine := &Engine{
|
engine := &Engine{
|
||||||
clientCtx: clientCtx,
|
clientCtx: clientCtx,
|
||||||
clientCancel: clientCancel,
|
clientCancel: clientCancel,
|
||||||
@@ -270,6 +281,10 @@ func NewEngine(
|
|||||||
jobExecutor: jobexec.NewExecutor(),
|
jobExecutor: jobexec.NewExecutor(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
engine.clientMetrics = metrics.NewClientMetrics(metrics.AgentInfo{
|
||||||
|
DeploymentType: deploymentType,
|
||||||
|
Version: version.NetbirdVersion()})
|
||||||
|
|
||||||
log.Infof("I am: %s", config.WgPrivateKey.PublicKey().String())
|
log.Infof("I am: %s", config.WgPrivateKey.PublicKey().String())
|
||||||
return engine
|
return engine
|
||||||
}
|
}
|
||||||
@@ -830,7 +845,9 @@ func (e *Engine) handleAutoUpdateVersion(autoUpdateSettings *mgmProto.AutoUpdate
|
|||||||
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||||
started := time.Now()
|
started := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.Infof("sync finished in %s", time.Since(started))
|
duration := time.Since(started)
|
||||||
|
log.Infof("sync finished in %s", duration)
|
||||||
|
e.clientMetrics.RecordSyncDuration(e.ctx, duration)
|
||||||
}()
|
}()
|
||||||
e.syncMsgMux.Lock()
|
e.syncMsgMux.Lock()
|
||||||
defer e.syncMsgMux.Unlock()
|
defer e.syncMsgMux.Unlock()
|
||||||
@@ -1077,6 +1094,7 @@ func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (*mgmProto.JobR
|
|||||||
StatusRecorder: e.statusRecorder,
|
StatusRecorder: e.statusRecorder,
|
||||||
SyncResponse: syncResponse,
|
SyncResponse: syncResponse,
|
||||||
LogPath: e.config.LogPath,
|
LogPath: e.config.LogPath,
|
||||||
|
ClientMetrics: e.clientMetrics,
|
||||||
RefreshStatus: func() {
|
RefreshStatus: func() {
|
||||||
e.RunHealthProbes(true)
|
e.RunHealthProbes(true)
|
||||||
},
|
},
|
||||||
@@ -1532,12 +1550,13 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix, agentV
|
|||||||
}
|
}
|
||||||
|
|
||||||
serviceDependencies := peer.ServiceDependencies{
|
serviceDependencies := peer.ServiceDependencies{
|
||||||
StatusRecorder: e.statusRecorder,
|
StatusRecorder: e.statusRecorder,
|
||||||
Signaler: e.signaler,
|
Signaler: e.signaler,
|
||||||
IFaceDiscover: e.mobileDep.IFaceDiscover,
|
IFaceDiscover: e.mobileDep.IFaceDiscover,
|
||||||
RelayManager: e.relayManager,
|
RelayManager: e.relayManager,
|
||||||
SrWatcher: e.srWatcher,
|
SrWatcher: e.srWatcher,
|
||||||
Semaphore: e.connSemaphore,
|
Semaphore: e.connSemaphore,
|
||||||
|
MetricsRecorder: e.clientMetrics,
|
||||||
}
|
}
|
||||||
peerConn, err := peer.NewConn(config, serviceDependencies)
|
peerConn, err := peer.NewConn(config, serviceDependencies)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1823,6 +1842,11 @@ func (e *Engine) GetFirewallManager() firewallManager.Manager {
|
|||||||
return e.firewall
|
return e.firewall
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetClientMetrics returns the client metrics
|
||||||
|
func (e *Engine) GetClientMetrics() *metrics.ClientMetrics {
|
||||||
|
return e.clientMetrics
|
||||||
|
}
|
||||||
|
|
||||||
func findIPFromInterfaceName(ifaceName string) (net.IP, error) {
|
func findIPFromInterfaceName(ifaceName string) (net.IP, error) {
|
||||||
iface, err := net.InterfaceByName(ifaceName)
|
iface, err := net.InterfaceByName(ifaceName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
17
client/internal/metrics/connection_type.go
Normal file
17
client/internal/metrics/connection_type.go
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
package metrics
|
||||||
|
|
||||||
|
// ConnectionType represents the type of peer connection
|
||||||
|
type ConnectionType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
// ConnectionTypeICE represents a direct peer-to-peer connection using ICE
|
||||||
|
ConnectionTypeICE ConnectionType = "ice"
|
||||||
|
|
||||||
|
// ConnectionTypeRelay represents a relayed connection
|
||||||
|
ConnectionTypeRelay ConnectionType = "relay"
|
||||||
|
)
|
||||||
|
|
||||||
|
// String returns the string representation of the connection type
|
||||||
|
func (c ConnectionType) String() string {
|
||||||
|
return string(c)
|
||||||
|
}
|
||||||
46
client/internal/metrics/deployment_type.go
Normal file
46
client/internal/metrics/deployment_type.go
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DeploymentType represents the type of NetBird deployment
|
||||||
|
type DeploymentType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
// DeploymentTypeUnknown represents an unknown or uninitialized deployment type
|
||||||
|
DeploymentTypeUnknown DeploymentType = iota
|
||||||
|
|
||||||
|
// DeploymentTypeCloud represents a cloud-hosted NetBird deployment
|
||||||
|
DeploymentTypeCloud
|
||||||
|
|
||||||
|
// DeploymentTypeSelfHosted represents a self-hosted NetBird deployment
|
||||||
|
DeploymentTypeSelfHosted
|
||||||
|
)
|
||||||
|
|
||||||
|
// String returns the string representation of the deployment type
|
||||||
|
func (d DeploymentType) String() string {
|
||||||
|
switch d {
|
||||||
|
case DeploymentTypeCloud:
|
||||||
|
return "cloud"
|
||||||
|
case DeploymentTypeSelfHosted:
|
||||||
|
return "selfhosted"
|
||||||
|
default:
|
||||||
|
return "selfhosted"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DetermineDeploymentType determines if the deployment is cloud or self-hosted
|
||||||
|
// based on the management URL string
|
||||||
|
func DetermineDeploymentType(managementURL string) DeploymentType {
|
||||||
|
if managementURL == "" {
|
||||||
|
return DeploymentTypeUnknown
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for NetBird cloud API domain
|
||||||
|
if strings.Contains(strings.ToLower(managementURL), "api.netbird.io") {
|
||||||
|
return DeploymentTypeCloud
|
||||||
|
}
|
||||||
|
|
||||||
|
return DeploymentTypeSelfHosted
|
||||||
|
}
|
||||||
77
client/internal/metrics/metrics.go
Normal file
77
client/internal/metrics/metrics.go
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// AgentInfo holds static information about the agent
|
||||||
|
type AgentInfo struct {
|
||||||
|
DeploymentType DeploymentType
|
||||||
|
Version string
|
||||||
|
}
|
||||||
|
|
||||||
|
// metricsImplementation defines the internal interface for metrics implementations
|
||||||
|
type metricsImplementation interface {
|
||||||
|
// RecordConnectionStages records connection stage metrics from timestamps
|
||||||
|
RecordConnectionStages(
|
||||||
|
ctx context.Context,
|
||||||
|
connectionType ConnectionType,
|
||||||
|
isReconnection bool,
|
||||||
|
timestamps ConnectionStageTimestamps,
|
||||||
|
)
|
||||||
|
|
||||||
|
// RecordSyncDuration records how long it took to process a sync message
|
||||||
|
RecordSyncDuration(ctx context.Context, duration time.Duration)
|
||||||
|
|
||||||
|
// Export exports metrics in Prometheus format
|
||||||
|
Export(w io.Writer) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type ClientMetrics struct {
|
||||||
|
impl metricsImplementation
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConnectionStageTimestamps holds timestamps for each connection stage
|
||||||
|
type ConnectionStageTimestamps struct {
|
||||||
|
Created time.Time
|
||||||
|
SemaphoreAcquired time.Time
|
||||||
|
Signaling time.Time // First signal sent (initial) or signal received (reconnection)
|
||||||
|
ConnectionReady time.Time
|
||||||
|
WgHandshakeSuccess time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewClientMetrics creates a new ClientMetrics instance
|
||||||
|
func NewClientMetrics(agentInfo AgentInfo) *ClientMetrics {
|
||||||
|
return &ClientMetrics{impl: newVictoriaMetrics(agentInfo)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordConnectionStages calculates stage durations from timestamps and records them
|
||||||
|
func (c *ClientMetrics) RecordConnectionStages(
|
||||||
|
ctx context.Context,
|
||||||
|
connectionType ConnectionType,
|
||||||
|
isReconnection bool,
|
||||||
|
timestamps ConnectionStageTimestamps,
|
||||||
|
) {
|
||||||
|
if c == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.impl.RecordConnectionStages(ctx, connectionType, isReconnection, timestamps)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordSyncDuration records the duration of sync message processing
|
||||||
|
func (c *ClientMetrics) RecordSyncDuration(ctx context.Context, duration time.Duration) {
|
||||||
|
if c == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.impl.RecordSyncDuration(ctx, duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Export exports metrics to the writer
|
||||||
|
func (c *ClientMetrics) Export(w io.Writer) error {
|
||||||
|
if c == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return c.impl.Export(w)
|
||||||
|
}
|
||||||
129
client/internal/metrics/victoria.go
Normal file
129
client/internal/metrics/victoria.go
Normal file
@@ -0,0 +1,129 @@
|
|||||||
|
package metrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/metrics"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// victoriaMetrics is the VictoriaMetrics implementation of ClientMetrics
|
||||||
|
type victoriaMetrics struct {
|
||||||
|
// Static agent information applied to all metrics
|
||||||
|
agentInfo AgentInfo
|
||||||
|
|
||||||
|
// Metrics set for managing all metrics
|
||||||
|
set *metrics.Set
|
||||||
|
}
|
||||||
|
|
||||||
|
func newVictoriaMetrics(agentInfo AgentInfo) metricsImplementation {
|
||||||
|
return &victoriaMetrics{
|
||||||
|
agentInfo: agentInfo,
|
||||||
|
set: metrics.NewSet(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordConnectionStages records the duration of each connection stage from timestamps
|
||||||
|
func (m *victoriaMetrics) RecordConnectionStages(
|
||||||
|
ctx context.Context,
|
||||||
|
connectionType ConnectionType,
|
||||||
|
isReconnection bool,
|
||||||
|
timestamps ConnectionStageTimestamps,
|
||||||
|
) {
|
||||||
|
// Calculate stage durations
|
||||||
|
var creationToSemaphore, semaphoreToSignaling, signalingToConnection, connectionToHandshake, totalDuration float64
|
||||||
|
|
||||||
|
if !timestamps.Created.IsZero() && !timestamps.SemaphoreAcquired.IsZero() {
|
||||||
|
creationToSemaphore = timestamps.SemaphoreAcquired.Sub(timestamps.Created).Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !timestamps.SemaphoreAcquired.IsZero() && !timestamps.Signaling.IsZero() {
|
||||||
|
semaphoreToSignaling = timestamps.Signaling.Sub(timestamps.SemaphoreAcquired).Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !timestamps.Signaling.IsZero() && !timestamps.ConnectionReady.IsZero() {
|
||||||
|
signalingToConnection = timestamps.ConnectionReady.Sub(timestamps.Signaling).Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !timestamps.ConnectionReady.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
|
||||||
|
connectionToHandshake = timestamps.WgHandshakeSuccess.Sub(timestamps.ConnectionReady).Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate total duration:
|
||||||
|
// For initial connections: Created → WgHandshakeSuccess
|
||||||
|
// For reconnections: Signaling → WgHandshakeSuccess (since Created is not tracked)
|
||||||
|
if !timestamps.Created.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
|
||||||
|
totalDuration = timestamps.WgHandshakeSuccess.Sub(timestamps.Created).Seconds()
|
||||||
|
} else if !timestamps.Signaling.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
|
||||||
|
totalDuration = timestamps.WgHandshakeSuccess.Sub(timestamps.Signaling).Seconds()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine attempt type
|
||||||
|
attemptType := "initial"
|
||||||
|
if isReconnection {
|
||||||
|
attemptType = "reconnection"
|
||||||
|
}
|
||||||
|
|
||||||
|
connTypeStr := connectionType.String()
|
||||||
|
|
||||||
|
// Record observations using histograms
|
||||||
|
m.set.GetOrCreateHistogram(
|
||||||
|
m.getMetricName("netbird_peer_connection_stage_creation_to_semaphore", connTypeStr, attemptType),
|
||||||
|
).Update(creationToSemaphore)
|
||||||
|
|
||||||
|
m.set.GetOrCreateHistogram(
|
||||||
|
m.getMetricName("netbird_peer_connection_stage_semaphore_to_signaling", connTypeStr, attemptType),
|
||||||
|
).Update(semaphoreToSignaling)
|
||||||
|
|
||||||
|
m.set.GetOrCreateHistogram(
|
||||||
|
m.getMetricName("netbird_peer_connection_stage_signaling_to_connection", connTypeStr, attemptType),
|
||||||
|
).Update(signalingToConnection)
|
||||||
|
|
||||||
|
m.set.GetOrCreateHistogram(
|
||||||
|
m.getMetricName("netbird_peer_connection_stage_connection_to_handshake", connTypeStr, attemptType),
|
||||||
|
).Update(connectionToHandshake)
|
||||||
|
|
||||||
|
m.set.GetOrCreateHistogram(
|
||||||
|
m.getMetricName("netbird_peer_connection_total_creation_to_handshake", connTypeStr, attemptType),
|
||||||
|
).Update(totalDuration)
|
||||||
|
|
||||||
|
log.Tracef("peer connection metrics [%s, %s, %s]: creation→semaphore: %.3fs, semaphore→signaling: %.3fs, signaling→connection: %.3fs, connection→handshake: %.3fs, total: %.3fs",
|
||||||
|
m.agentInfo.DeploymentType.String(), connTypeStr, attemptType,
|
||||||
|
creationToSemaphore, semaphoreToSignaling, signalingToConnection, connectionToHandshake,
|
||||||
|
totalDuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
// getMetricName constructs a metric name with labels
|
||||||
|
func (m *victoriaMetrics) getMetricName(baseName, connectionType, attemptType string) string {
|
||||||
|
return fmt.Sprintf(`%s{deployment_type=%q,connection_type=%q,attempt_type=%q,version=%q}`,
|
||||||
|
baseName,
|
||||||
|
m.agentInfo.DeploymentType.String(),
|
||||||
|
connectionType,
|
||||||
|
attemptType,
|
||||||
|
m.agentInfo.Version,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordSyncDuration records the duration of sync message processing
|
||||||
|
func (m *victoriaMetrics) RecordSyncDuration(ctx context.Context, duration time.Duration) {
|
||||||
|
metricName := fmt.Sprintf(`netbird_sync_duration_seconds{deployment_type=%q,version=%q}`,
|
||||||
|
m.agentInfo.DeploymentType.String(),
|
||||||
|
m.agentInfo.Version,
|
||||||
|
)
|
||||||
|
|
||||||
|
m.set.GetOrCreateHistogram(metricName).Update(duration.Seconds())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Export writes metrics in Prometheus text format with HELP comments
|
||||||
|
func (m *victoriaMetrics) Export(w io.Writer) error {
|
||||||
|
if m.set == nil {
|
||||||
|
return fmt.Errorf("metrics set not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write metrics in Prometheus format
|
||||||
|
m.set.WritePrometheus(w)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@@ -16,6 +16,7 @@ import (
|
|||||||
|
|
||||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||||
"github.com/netbirdio/netbird/client/iface/wgproxy"
|
"github.com/netbirdio/netbird/client/iface/wgproxy"
|
||||||
|
"github.com/netbirdio/netbird/client/internal/metrics"
|
||||||
"github.com/netbirdio/netbird/client/internal/peer/conntype"
|
"github.com/netbirdio/netbird/client/internal/peer/conntype"
|
||||||
"github.com/netbirdio/netbird/client/internal/peer/dispatcher"
|
"github.com/netbirdio/netbird/client/internal/peer/dispatcher"
|
||||||
"github.com/netbirdio/netbird/client/internal/peer/guard"
|
"github.com/netbirdio/netbird/client/internal/peer/guard"
|
||||||
@@ -28,6 +29,16 @@ import (
|
|||||||
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// MetricsRecorder is an interface for recording peer connection metrics
|
||||||
|
type MetricsRecorder interface {
|
||||||
|
RecordConnectionStages(
|
||||||
|
ctx context.Context,
|
||||||
|
connectionType metrics.ConnectionType,
|
||||||
|
isReconnection bool,
|
||||||
|
timestamps metrics.ConnectionStageTimestamps,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
type ServiceDependencies struct {
|
type ServiceDependencies struct {
|
||||||
StatusRecorder *Status
|
StatusRecorder *Status
|
||||||
Signaler *Signaler
|
Signaler *Signaler
|
||||||
@@ -36,6 +47,7 @@ type ServiceDependencies struct {
|
|||||||
SrWatcher *guard.SRWatcher
|
SrWatcher *guard.SRWatcher
|
||||||
Semaphore *semaphoregroup.SemaphoreGroup
|
Semaphore *semaphoregroup.SemaphoreGroup
|
||||||
PeerConnDispatcher *dispatcher.ConnectionDispatcher
|
PeerConnDispatcher *dispatcher.ConnectionDispatcher
|
||||||
|
MetricsRecorder MetricsRecorder
|
||||||
}
|
}
|
||||||
|
|
||||||
type WgConfig struct {
|
type WgConfig struct {
|
||||||
@@ -119,6 +131,10 @@ type Conn struct {
|
|||||||
dumpState *stateDump
|
dumpState *stateDump
|
||||||
|
|
||||||
endpointUpdater *EndpointUpdater
|
endpointUpdater *EndpointUpdater
|
||||||
|
|
||||||
|
// Connection stage timestamps for metrics
|
||||||
|
metricsRecorder MetricsRecorder
|
||||||
|
metricsStages MetricsStages
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConn creates a new not opened Conn to the remote peer.
|
// NewConn creates a new not opened Conn to the remote peer.
|
||||||
@@ -144,9 +160,11 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
|
|||||||
statusICE: worker.NewAtomicStatus(),
|
statusICE: worker.NewAtomicStatus(),
|
||||||
dumpState: dumpState,
|
dumpState: dumpState,
|
||||||
endpointUpdater: NewEndpointUpdater(connLog, config.WgConfig, isController(config)),
|
endpointUpdater: NewEndpointUpdater(connLog, config.WgConfig, isController(config)),
|
||||||
wgWatcher: NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState),
|
metricsRecorder: services.MetricsRecorder,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
conn.wgWatcher = NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState)
|
||||||
|
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -154,6 +172,20 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
|
|||||||
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
|
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
|
||||||
// be used.
|
// be used.
|
||||||
func (conn *Conn) Open(engineCtx context.Context) error {
|
func (conn *Conn) Open(engineCtx context.Context) error {
|
||||||
|
conn.mu.Lock()
|
||||||
|
if conn.opened {
|
||||||
|
conn.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record the start time - beginning of connection attempt
|
||||||
|
conn.metricsStages = MetricsStages{}
|
||||||
|
conn.metricsStages.RecordCreated()
|
||||||
|
|
||||||
|
conn.mu.Unlock()
|
||||||
|
|
||||||
|
// Semaphore.Add() blocks here until there's a free slot
|
||||||
|
// todo create common semaphor logic for reconnection and connections too that can remote seats from semaphor on the fly
|
||||||
if err := conn.semaphore.Add(engineCtx); err != nil {
|
if err := conn.semaphore.Add(engineCtx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -166,6 +198,9 @@ func (conn *Conn) Open(engineCtx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Record when semaphore was acquired (after the wait)
|
||||||
|
conn.metricsStages.RecordSemaphoreAcquired()
|
||||||
|
|
||||||
conn.ctx, conn.ctxCancel = context.WithCancel(engineCtx)
|
conn.ctx, conn.ctxCancel = context.WithCancel(engineCtx)
|
||||||
|
|
||||||
conn.workerRelay = NewWorkerRelay(conn.ctx, conn.Log, isController(conn.config), conn.config, conn, conn.relayManager)
|
conn.workerRelay = NewWorkerRelay(conn.ctx, conn.Log, isController(conn.config), conn.config, conn, conn.relayManager)
|
||||||
@@ -178,7 +213,7 @@ func (conn *Conn) Open(engineCtx context.Context) error {
|
|||||||
}
|
}
|
||||||
conn.workerICE = workerICE
|
conn.workerICE = workerICE
|
||||||
|
|
||||||
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay)
|
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay, &conn.metricsStages)
|
||||||
|
|
||||||
conn.handshaker.AddRelayListener(conn.workerRelay.OnNewOffer)
|
conn.handshaker.AddRelayListener(conn.workerRelay.OnNewOffer)
|
||||||
if !isForceRelayed() {
|
if !isForceRelayed() {
|
||||||
@@ -350,7 +385,7 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
|
|||||||
if conn.currentConnPriority > priority {
|
if conn.currentConnPriority > priority {
|
||||||
conn.Log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
|
conn.Log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
|
||||||
conn.statusICE.SetConnected()
|
conn.statusICE.SetConnected()
|
||||||
conn.updateIceState(iceConnInfo)
|
conn.updateIceState(iceConnInfo, time.Now())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -390,7 +425,8 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
|
|||||||
}
|
}
|
||||||
|
|
||||||
conn.Log.Infof("configure WireGuard endpoint to: %s", ep.String())
|
conn.Log.Infof("configure WireGuard endpoint to: %s", ep.String())
|
||||||
conn.enableWgWatcherIfNeeded()
|
updateTime := time.Now()
|
||||||
|
conn.enableWgWatcherIfNeeded(updateTime)
|
||||||
|
|
||||||
presharedKey := conn.presharedKey(iceConnInfo.RosenpassPubKey)
|
presharedKey := conn.presharedKey(iceConnInfo.RosenpassPubKey)
|
||||||
if err = conn.endpointUpdater.ConfigureWGEndpoint(ep, presharedKey); err != nil {
|
if err = conn.endpointUpdater.ConfigureWGEndpoint(ep, presharedKey); err != nil {
|
||||||
@@ -406,8 +442,8 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
|
|||||||
|
|
||||||
conn.currentConnPriority = priority
|
conn.currentConnPriority = priority
|
||||||
conn.statusICE.SetConnected()
|
conn.statusICE.SetConnected()
|
||||||
conn.updateIceState(iceConnInfo)
|
conn.updateIceState(iceConnInfo, updateTime)
|
||||||
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
|
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr, updateTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) onICEStateDisconnected() {
|
func (conn *Conn) onICEStateDisconnected() {
|
||||||
@@ -455,6 +491,10 @@ func (conn *Conn) onICEStateDisconnected() {
|
|||||||
|
|
||||||
conn.disableWgWatcherIfNeeded()
|
conn.disableWgWatcherIfNeeded()
|
||||||
|
|
||||||
|
if conn.currentConnPriority == conntype.None {
|
||||||
|
conn.metricsStages.Disconnected()
|
||||||
|
}
|
||||||
|
|
||||||
peerState := State{
|
peerState := State{
|
||||||
PubKey: conn.config.Key,
|
PubKey: conn.config.Key,
|
||||||
ConnStatus: conn.evalStatus(),
|
ConnStatus: conn.evalStatus(),
|
||||||
@@ -495,14 +535,15 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
|||||||
conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
|
conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
|
||||||
conn.setRelayedProxy(wgProxy)
|
conn.setRelayedProxy(wgProxy)
|
||||||
conn.statusRelay.SetConnected()
|
conn.statusRelay.SetConnected()
|
||||||
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey, time.Now())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
wgProxy.Work()
|
wgProxy.Work()
|
||||||
presharedKey := conn.presharedKey(rci.rosenpassPubKey)
|
presharedKey := conn.presharedKey(rci.rosenpassPubKey)
|
||||||
|
|
||||||
conn.enableWgWatcherIfNeeded()
|
updateTime := time.Now()
|
||||||
|
conn.enableWgWatcherIfNeeded(updateTime)
|
||||||
|
|
||||||
if err := conn.endpointUpdater.ConfigureWGEndpoint(wgProxy.EndpointAddr(), presharedKey); err != nil {
|
if err := conn.endpointUpdater.ConfigureWGEndpoint(wgProxy.EndpointAddr(), presharedKey); err != nil {
|
||||||
if err := wgProxy.CloseConn(); err != nil {
|
if err := wgProxy.CloseConn(); err != nil {
|
||||||
@@ -513,13 +554,14 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
wgConfigWorkaround()
|
wgConfigWorkaround()
|
||||||
|
|
||||||
conn.rosenpassRemoteKey = rci.rosenpassPubKey
|
conn.rosenpassRemoteKey = rci.rosenpassPubKey
|
||||||
conn.currentConnPriority = conntype.Relay
|
conn.currentConnPriority = conntype.Relay
|
||||||
conn.statusRelay.SetConnected()
|
conn.statusRelay.SetConnected()
|
||||||
conn.setRelayedProxy(wgProxy)
|
conn.setRelayedProxy(wgProxy)
|
||||||
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey, updateTime)
|
||||||
conn.Log.Infof("start to communicate with peer via relay")
|
conn.Log.Infof("start to communicate with peer via relay")
|
||||||
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
|
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr, updateTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) onRelayDisconnected() {
|
func (conn *Conn) onRelayDisconnected() {
|
||||||
@@ -557,6 +599,10 @@ func (conn *Conn) handleRelayDisconnectedLocked() {
|
|||||||
|
|
||||||
conn.disableWgWatcherIfNeeded()
|
conn.disableWgWatcherIfNeeded()
|
||||||
|
|
||||||
|
if conn.currentConnPriority == conntype.None {
|
||||||
|
conn.metricsStages.Disconnected()
|
||||||
|
}
|
||||||
|
|
||||||
peerState := State{
|
peerState := State{
|
||||||
PubKey: conn.config.Key,
|
PubKey: conn.config.Key,
|
||||||
ConnStatus: conn.evalStatus(),
|
ConnStatus: conn.evalStatus(),
|
||||||
@@ -573,6 +619,7 @@ func (conn *Conn) onGuardEvent() {
|
|||||||
if err := conn.handshaker.SendOffer(); err != nil {
|
if err := conn.handshaker.SendOffer(); err != nil {
|
||||||
conn.Log.Errorf("failed to send offer: %v", err)
|
conn.Log.Errorf("failed to send offer: %v", err)
|
||||||
}
|
}
|
||||||
|
conn.metricsStages.RecordSignaling()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) onWGDisconnected() {
|
func (conn *Conn) onWGDisconnected() {
|
||||||
@@ -597,10 +644,10 @@ func (conn *Conn) onWGDisconnected() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte) {
|
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte, updateTime time.Time) {
|
||||||
peerState := State{
|
peerState := State{
|
||||||
PubKey: conn.config.Key,
|
PubKey: conn.config.Key,
|
||||||
ConnStatusUpdate: time.Now(),
|
ConnStatusUpdate: updateTime,
|
||||||
ConnStatus: conn.evalStatus(),
|
ConnStatus: conn.evalStatus(),
|
||||||
Relayed: conn.isRelayed(),
|
Relayed: conn.isRelayed(),
|
||||||
RelayServerAddress: relayServerAddr,
|
RelayServerAddress: relayServerAddr,
|
||||||
@@ -613,10 +660,10 @@ func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []by
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo) {
|
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo, updateTime time.Time) {
|
||||||
peerState := State{
|
peerState := State{
|
||||||
PubKey: conn.config.Key,
|
PubKey: conn.config.Key,
|
||||||
ConnStatusUpdate: time.Now(),
|
ConnStatusUpdate: updateTime,
|
||||||
ConnStatus: conn.evalStatus(),
|
ConnStatus: conn.evalStatus(),
|
||||||
Relayed: iceConnInfo.Relayed,
|
Relayed: iceConnInfo.Relayed,
|
||||||
LocalIceCandidateType: iceConnInfo.LocalIceCandidateType,
|
LocalIceCandidateType: iceConnInfo.LocalIceCandidateType,
|
||||||
@@ -654,11 +701,13 @@ func (conn *Conn) setStatusToDisconnected() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string) {
|
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string, updateTime time.Time) {
|
||||||
if runtime.GOOS == "ios" {
|
if runtime.GOOS == "ios" {
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
conn.metricsStages.RecordConnectionReady(updateTime)
|
||||||
|
|
||||||
if conn.onConnected != nil {
|
if conn.onConnected != nil {
|
||||||
conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.config.WgConfig.AllowedIps[0].Addr().String(), remoteRosenpassAddr)
|
conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.config.WgConfig.AllowedIps[0].Addr().String(), remoteRosenpassAddr)
|
||||||
}
|
}
|
||||||
@@ -723,14 +772,14 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) enableWgWatcherIfNeeded() {
|
func (conn *Conn) enableWgWatcherIfNeeded(enabledTime time.Time) {
|
||||||
if !conn.wgWatcher.IsEnabled() {
|
if !conn.wgWatcher.IsEnabled() {
|
||||||
wgWatcherCtx, wgWatcherCancel := context.WithCancel(conn.ctx)
|
wgWatcherCtx, wgWatcherCancel := context.WithCancel(conn.ctx)
|
||||||
conn.wgWatcherCancel = wgWatcherCancel
|
conn.wgWatcherCancel = wgWatcherCancel
|
||||||
conn.wgWatcherWg.Add(1)
|
conn.wgWatcherWg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer conn.wgWatcherWg.Done()
|
defer conn.wgWatcherWg.Done()
|
||||||
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, conn.onWGDisconnected)
|
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, enabledTime, conn.onWGDisconnected, conn.onWGHandshakeSuccess)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -794,6 +843,36 @@ func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) {
|
|||||||
conn.wgProxyRelay = proxy
|
conn.wgProxyRelay = proxy
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// onWGHandshakeSuccess is called when the first WireGuard handshake is detected
|
||||||
|
func (conn *Conn) onWGHandshakeSuccess(when time.Time) {
|
||||||
|
conn.metricsStages.RecordWGHandshakeSuccess(when)
|
||||||
|
conn.recordConnectionMetrics()
|
||||||
|
}
|
||||||
|
|
||||||
|
// recordConnectionMetrics records connection stage timestamps as metrics
|
||||||
|
func (conn *Conn) recordConnectionMetrics() {
|
||||||
|
if conn.metricsRecorder == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine connection type based on current priority
|
||||||
|
var connType metrics.ConnectionType
|
||||||
|
switch conn.currentConnPriority {
|
||||||
|
case conntype.Relay:
|
||||||
|
connType = metrics.ConnectionTypeRelay
|
||||||
|
default:
|
||||||
|
connType = metrics.ConnectionTypeICE
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record metrics with timestamps - duration calculation happens in metrics package
|
||||||
|
conn.metricsRecorder.RecordConnectionStages(
|
||||||
|
context.Background(),
|
||||||
|
connType,
|
||||||
|
conn.metricsStages.IsReconnection(),
|
||||||
|
conn.metricsStages.GetTimestamps(),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
// AllowedIP returns the allowed IP of the remote peer
|
// AllowedIP returns the allowed IP of the remote peer
|
||||||
func (conn *Conn) AllowedIP() netip.Addr {
|
func (conn *Conn) AllowedIP() netip.Addr {
|
||||||
return conn.config.WgConfig.AllowedIps[0].Addr()
|
return conn.config.WgConfig.AllowedIps[0].Addr()
|
||||||
|
|||||||
@@ -44,12 +44,13 @@ type OfferAnswer struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Handshaker struct {
|
type Handshaker struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
log *log.Entry
|
log *log.Entry
|
||||||
config ConnConfig
|
config ConnConfig
|
||||||
signaler *Signaler
|
signaler *Signaler
|
||||||
ice *WorkerICE
|
ice *WorkerICE
|
||||||
relay *WorkerRelay
|
relay *WorkerRelay
|
||||||
|
metricsStages *MetricsStages
|
||||||
// relayListener is not blocking because the listener is using a goroutine to process the messages
|
// relayListener is not blocking because the listener is using a goroutine to process the messages
|
||||||
// and it will only keep the latest message if multiple offers are received in a short time
|
// and it will only keep the latest message if multiple offers are received in a short time
|
||||||
// this is to avoid blocking the handshaker if the listener is doing some heavy processing
|
// this is to avoid blocking the handshaker if the listener is doing some heavy processing
|
||||||
@@ -64,13 +65,14 @@ type Handshaker struct {
|
|||||||
remoteAnswerCh chan OfferAnswer
|
remoteAnswerCh chan OfferAnswer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
|
func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay, metricsStages *MetricsStages) *Handshaker {
|
||||||
return &Handshaker{
|
return &Handshaker{
|
||||||
log: log,
|
log: log,
|
||||||
config: config,
|
config: config,
|
||||||
signaler: signaler,
|
signaler: signaler,
|
||||||
ice: ice,
|
ice: ice,
|
||||||
relay: relay,
|
relay: relay,
|
||||||
|
metricsStages: metricsStages,
|
||||||
remoteOffersCh: make(chan OfferAnswer),
|
remoteOffersCh: make(chan OfferAnswer),
|
||||||
remoteAnswerCh: make(chan OfferAnswer),
|
remoteAnswerCh: make(chan OfferAnswer),
|
||||||
}
|
}
|
||||||
@@ -89,6 +91,12 @@ func (h *Handshaker) Listen(ctx context.Context) {
|
|||||||
select {
|
select {
|
||||||
case remoteOfferAnswer := <-h.remoteOffersCh:
|
case remoteOfferAnswer := <-h.remoteOffersCh:
|
||||||
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
|
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
|
||||||
|
|
||||||
|
// Record signaling received for reconnection attempts
|
||||||
|
if h.metricsStages != nil {
|
||||||
|
h.metricsStages.RecordSignalingReceived()
|
||||||
|
}
|
||||||
|
|
||||||
if h.relayListener != nil {
|
if h.relayListener != nil {
|
||||||
h.relayListener.Notify(&remoteOfferAnswer)
|
h.relayListener.Notify(&remoteOfferAnswer)
|
||||||
}
|
}
|
||||||
@@ -103,6 +111,12 @@ func (h *Handshaker) Listen(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
case remoteOfferAnswer := <-h.remoteAnswerCh:
|
case remoteOfferAnswer := <-h.remoteAnswerCh:
|
||||||
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
|
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
|
||||||
|
|
||||||
|
// Record signaling received for reconnection attempts
|
||||||
|
if h.metricsStages != nil {
|
||||||
|
h.metricsStages.RecordSignalingReceived()
|
||||||
|
}
|
||||||
|
|
||||||
if h.relayListener != nil {
|
if h.relayListener != nil {
|
||||||
h.relayListener.Notify(&remoteOfferAnswer)
|
h.relayListener.Notify(&remoteOfferAnswer)
|
||||||
}
|
}
|
||||||
|
|||||||
104
client/internal/peer/metrics_saver.go
Normal file
104
client/internal/peer/metrics_saver.go
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
package peer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/internal/metrics"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MetricsStages struct {
|
||||||
|
isReconnectionAttempt bool // Track if current attempt is a reconnection
|
||||||
|
stageTimestamps metrics.ConnectionStageTimestamps
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MetricsStages) RecordCreated() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.stageTimestamps.Created = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MetricsStages) RecordSemaphoreAcquired() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
s.stageTimestamps.SemaphoreAcquired = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordSignaling records the signaling timestamp when sending offers
|
||||||
|
// For initial connections: records when we start sending
|
||||||
|
// For reconnections: does nothing (we wait for RecordSignalingReceived)
|
||||||
|
func (s *MetricsStages) RecordSignaling() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if s.isReconnectionAttempt {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.stageTimestamps.Signaling.IsZero() {
|
||||||
|
s.stageTimestamps.Signaling = time.Now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RecordSignalingReceived records the signaling timestamp when receiving offers/answers
|
||||||
|
// For reconnections: records when we receive the first signal
|
||||||
|
// For initial connections: does nothing (already recorded in RecordSignaling)
|
||||||
|
func (s *MetricsStages) RecordSignalingReceived() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
// Only record for reconnections when we receive a signal
|
||||||
|
if s.isReconnectionAttempt && s.stageTimestamps.Signaling.IsZero() {
|
||||||
|
s.stageTimestamps.Signaling = time.Now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MetricsStages) RecordConnectionReady(when time.Time) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
if s.stageTimestamps.ConnectionReady.IsZero() {
|
||||||
|
s.stageTimestamps.ConnectionReady = when
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MetricsStages) RecordWGHandshakeSuccess(handshakeTime time.Time) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if !s.stageTimestamps.ConnectionReady.IsZero() {
|
||||||
|
// WireGuard only reports handshake times with second precision, but ConnectionReady
|
||||||
|
// is captured with microsecond precision. If handshake appears before ConnectionReady
|
||||||
|
// due to truncation (e.g., handshake at 6.042s truncated to 6.000s), normalize to
|
||||||
|
// ConnectionReady to avoid negative duration metrics.
|
||||||
|
if handshakeTime.Before(s.stageTimestamps.ConnectionReady) {
|
||||||
|
s.stageTimestamps.WgHandshakeSuccess = s.stageTimestamps.ConnectionReady
|
||||||
|
} else {
|
||||||
|
s.stageTimestamps.WgHandshakeSuccess = handshakeTime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Disconnected sets the mode to reconnection. It is called only when both ICE and Relay have been disconnected at the same time.
|
||||||
|
func (s *MetricsStages) Disconnected() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
// Reset all timestamps for reconnection
|
||||||
|
// For reconnections, we only track from Signaling onwards
|
||||||
|
// This avoids meaningless creation→semaphore and semaphore→signaling metrics
|
||||||
|
s.stageTimestamps = metrics.ConnectionStageTimestamps{}
|
||||||
|
s.isReconnectionAttempt = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MetricsStages) IsReconnection() bool {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
return s.isReconnectionAttempt
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MetricsStages) GetTimestamps() metrics.ConnectionStageTimestamps {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
return s.stageTimestamps
|
||||||
|
}
|
||||||
@@ -45,7 +45,7 @@ func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey strin
|
|||||||
|
|
||||||
// EnableWgWatcher starts the WireGuard watcher. If it is already enabled, it will return immediately and do nothing.
|
// EnableWgWatcher starts the WireGuard watcher. If it is already enabled, it will return immediately and do nothing.
|
||||||
// The watcher runs until ctx is cancelled. Caller is responsible for context lifecycle management.
|
// The watcher runs until ctx is cancelled. Caller is responsible for context lifecycle management.
|
||||||
func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()) {
|
func (w *WGWatcher) EnableWgWatcher(ctx context.Context, enabledTime time.Time, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time)) {
|
||||||
w.muEnabled.Lock()
|
w.muEnabled.Lock()
|
||||||
if w.enabled {
|
if w.enabled {
|
||||||
w.muEnabled.Unlock()
|
w.muEnabled.Unlock()
|
||||||
@@ -53,7 +53,6 @@ func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()
|
|||||||
}
|
}
|
||||||
|
|
||||||
w.log.Debugf("enable WireGuard watcher")
|
w.log.Debugf("enable WireGuard watcher")
|
||||||
enabledTime := time.Now()
|
|
||||||
w.enabled = true
|
w.enabled = true
|
||||||
w.muEnabled.Unlock()
|
w.muEnabled.Unlock()
|
||||||
|
|
||||||
@@ -62,7 +61,7 @@ func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()
|
|||||||
w.log.Warnf("failed to read initial wg stats: %v", err)
|
w.log.Warnf("failed to read initial wg stats: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
w.periodicHandshakeCheck(ctx, onDisconnectedFn, enabledTime, initialHandshake)
|
w.periodicHandshakeCheck(ctx, onDisconnectedFn, onHandshakeSuccessFn, enabledTime, initialHandshake)
|
||||||
|
|
||||||
w.muEnabled.Lock()
|
w.muEnabled.Lock()
|
||||||
w.enabled = false
|
w.enabled = false
|
||||||
@@ -77,7 +76,7 @@ func (w *WGWatcher) IsEnabled() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// wgStateCheck help to check the state of the WireGuard handshake and relay connection
|
// wgStateCheck help to check the state of the WireGuard handshake and relay connection
|
||||||
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn func(), enabledTime time.Time, initialHandshake time.Time) {
|
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time), enabledTime time.Time, initialHandshake time.Time) {
|
||||||
w.log.Infof("WireGuard watcher started")
|
w.log.Infof("WireGuard watcher started")
|
||||||
|
|
||||||
timer := time.NewTimer(wgHandshakeOvertime)
|
timer := time.NewTimer(wgHandshakeOvertime)
|
||||||
@@ -96,6 +95,9 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn
|
|||||||
if lastHandshake.IsZero() {
|
if lastHandshake.IsZero() {
|
||||||
elapsed := calcElapsed(enabledTime, *handshake)
|
elapsed := calcElapsed(enabledTime, *handshake)
|
||||||
w.log.Infof("first wg handshake detected within: %.2fsec, (%s)", elapsed, handshake)
|
w.log.Infof("first wg handshake detected within: %.2fsec, (%s)", elapsed, handshake)
|
||||||
|
if onHandshakeSuccessFn != nil {
|
||||||
|
onHandshakeSuccessFn(*handshake)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lastHandshake = *handshake
|
lastHandshake = *handshake
|
||||||
|
|||||||
@@ -35,9 +35,11 @@ func TestWGWatcher_EnableWgWatcher(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
onDisconnected := make(chan struct{}, 1)
|
onDisconnected := make(chan struct{}, 1)
|
||||||
go watcher.EnableWgWatcher(ctx, func() {
|
go watcher.EnableWgWatcher(ctx, time.Now(), func() {
|
||||||
mlog.Infof("onDisconnectedFn")
|
mlog.Infof("onDisconnectedFn")
|
||||||
onDisconnected <- struct{}{}
|
onDisconnected <- struct{}{}
|
||||||
|
}, func(when time.Time) {
|
||||||
|
mlog.Infof("onHandshakeSuccess: %v", when)
|
||||||
})
|
})
|
||||||
|
|
||||||
// wait for initial reading
|
// wait for initial reading
|
||||||
@@ -64,7 +66,7 @@ func TestWGWatcher_ReEnable(t *testing.T) {
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
watcher.EnableWgWatcher(ctx, func() {})
|
watcher.EnableWgWatcher(ctx, time.Now(), func() {}, func(when time.Time) {})
|
||||||
}()
|
}()
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
@@ -75,9 +77,9 @@ func TestWGWatcher_ReEnable(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
onDisconnected := make(chan struct{}, 1)
|
onDisconnected := make(chan struct{}, 1)
|
||||||
go watcher.EnableWgWatcher(ctx, func() {
|
go watcher.EnableWgWatcher(ctx, time.Now(), func() {
|
||||||
onDisconnected <- struct{}{}
|
onDisconnected <- struct{}{}
|
||||||
})
|
}, func(when time.Time) {})
|
||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
mocWgIface.disconnect()
|
mocWgIface.disconnect()
|
||||||
|
|||||||
@@ -26,6 +26,13 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (
|
|||||||
log.Warnf("failed to get latest sync response: %v", err)
|
log.Warnf("failed to get latest sync response: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var clientMetrics debug.MetricsExporter
|
||||||
|
if s.connectClient != nil {
|
||||||
|
if engine := s.connectClient.Engine(); engine != nil {
|
||||||
|
clientMetrics = engine.GetClientMetrics()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var cpuProfileData []byte
|
var cpuProfileData []byte
|
||||||
if s.cpuProfileBuf != nil && !s.cpuProfiling {
|
if s.cpuProfileBuf != nil && !s.cpuProfiling {
|
||||||
cpuProfileData = s.cpuProfileBuf.Bytes()
|
cpuProfileData = s.cpuProfileBuf.Bytes()
|
||||||
@@ -54,6 +61,7 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (
|
|||||||
LogPath: s.logFile,
|
LogPath: s.logFile,
|
||||||
CPUProfile: cpuProfileData,
|
CPUProfile: cpuProfileData,
|
||||||
RefreshStatus: refreshStatus,
|
RefreshStatus: refreshStatus,
|
||||||
|
ClientMetrics: clientMetrics,
|
||||||
},
|
},
|
||||||
debug.BundleConfig{
|
debug.BundleConfig{
|
||||||
Anonymize: req.GetAnonymize(),
|
Anonymize: req.GetAnonymize(),
|
||||||
|
|||||||
3
go.mod
3
go.mod
@@ -33,6 +33,7 @@ require (
|
|||||||
fyne.io/fyne/v2 v2.7.0
|
fyne.io/fyne/v2 v2.7.0
|
||||||
fyne.io/systray v1.12.1-0.20260116214250-81f8e1a496f9
|
fyne.io/systray v1.12.1-0.20260116214250-81f8e1a496f9
|
||||||
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible
|
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible
|
||||||
|
github.com/VictoriaMetrics/metrics v1.40.2
|
||||||
github.com/awnumar/memguard v0.23.0
|
github.com/awnumar/memguard v0.23.0
|
||||||
github.com/aws/aws-sdk-go-v2 v1.36.3
|
github.com/aws/aws-sdk-go-v2 v1.36.3
|
||||||
github.com/aws/aws-sdk-go-v2/config v1.29.14
|
github.com/aws/aws-sdk-go-v2/config v1.29.14
|
||||||
@@ -263,6 +264,8 @@ require (
|
|||||||
github.com/stretchr/objx v0.5.2 // indirect
|
github.com/stretchr/objx v0.5.2 // indirect
|
||||||
github.com/tklauser/go-sysconf v0.3.14 // indirect
|
github.com/tklauser/go-sysconf v0.3.14 // indirect
|
||||||
github.com/tklauser/numcpus v0.8.0 // indirect
|
github.com/tklauser/numcpus v0.8.0 // indirect
|
||||||
|
github.com/valyala/fastrand v1.1.0 // indirect
|
||||||
|
github.com/valyala/histogram v1.2.0 // indirect
|
||||||
github.com/vishvananda/netns v0.0.5 // indirect
|
github.com/vishvananda/netns v0.0.5 // indirect
|
||||||
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
|
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
|
||||||
github.com/wlynxg/anet v0.0.5 // indirect
|
github.com/wlynxg/anet v0.0.5 // indirect
|
||||||
|
|||||||
6
go.sum
6
go.sum
@@ -38,6 +38,8 @@ github.com/Microsoft/hcsshim v0.12.3/go.mod h1:Iyl1WVpZzr+UkzjekHZbV8o5Z9ZkxNGx6
|
|||||||
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
|
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
|
||||||
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible h1:hqcTK6ZISdip65SR792lwYJTa/axESA0889D3UlZbLo=
|
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible h1:hqcTK6ZISdip65SR792lwYJTa/axESA0889D3UlZbLo=
|
||||||
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible/go.mod h1:6B1nuc1MUs6c62ODZDl7hVE5Pv7O2XGSkgg2olnq34I=
|
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible/go.mod h1:6B1nuc1MUs6c62ODZDl7hVE5Pv7O2XGSkgg2olnq34I=
|
||||||
|
github.com/VictoriaMetrics/metrics v1.40.2 h1:OVSjKcQEx6JAwGeu8/KQm9Su5qJ72TMEW4xYn5vw3Ac=
|
||||||
|
github.com/VictoriaMetrics/metrics v1.40.2/go.mod h1:XE4uudAAIRaJE614Tl5HMrtoEU6+GDZO4QTnNSsZRuA=
|
||||||
github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e h1:4dAU9FXIyQktpoUAgOJK3OTFc/xug0PCXYCqU0FgDKI=
|
github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e h1:4dAU9FXIyQktpoUAgOJK3OTFc/xug0PCXYCqU0FgDKI=
|
||||||
github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e/go.mod h1:cEWa1LVoE5KvSD9ONXsZrj0z6KqySlCCNKHlLzbqAt4=
|
github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e/go.mod h1:cEWa1LVoE5KvSD9ONXsZrj0z6KqySlCCNKHlLzbqAt4=
|
||||||
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
|
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
|
||||||
@@ -574,6 +576,10 @@ github.com/tklauser/numcpus v0.8.0 h1:Mx4Wwe/FjZLeQsK/6kt2EOepwwSl7SmJrK5bV/dXYg
|
|||||||
github.com/tklauser/numcpus v0.8.0/go.mod h1:ZJZlAY+dmR4eut8epnzf0u/VwodKmryxR8txiloSqBE=
|
github.com/tklauser/numcpus v0.8.0/go.mod h1:ZJZlAY+dmR4eut8epnzf0u/VwodKmryxR8txiloSqBE=
|
||||||
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
|
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
|
||||||
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
||||||
|
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
|
||||||
|
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
|
||||||
|
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
|
||||||
|
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
|
||||||
github.com/vishvananda/netlink v1.3.1 h1:3AEMt62VKqz90r0tmNhog0r/PpWKmrEShJU0wJW6bV0=
|
github.com/vishvananda/netlink v1.3.1 h1:3AEMt62VKqz90r0tmNhog0r/PpWKmrEShJU0wJW6bV0=
|
||||||
github.com/vishvananda/netlink v1.3.1/go.mod h1:ARtKouGSTGchR8aMwmkzC0qiNPrrWO5JS/XMVl45+b4=
|
github.com/vishvananda/netlink v1.3.1/go.mod h1:ARtKouGSTGchR8aMwmkzC0qiNPrrWO5JS/XMVl45+b4=
|
||||||
github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zdEY=
|
github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zdEY=
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ type Client interface {
|
|||||||
GetDeviceAuthorizationFlow(serverKey wgtypes.Key) (*proto.DeviceAuthorizationFlow, error)
|
GetDeviceAuthorizationFlow(serverKey wgtypes.Key) (*proto.DeviceAuthorizationFlow, error)
|
||||||
GetPKCEAuthorizationFlow(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
|
GetPKCEAuthorizationFlow(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
|
||||||
GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, error)
|
GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, error)
|
||||||
|
GetServerURL() string
|
||||||
IsHealthy() bool
|
IsHealthy() bool
|
||||||
SyncMeta(sysInfo *system.Info) error
|
SyncMeta(sysInfo *system.Info) error
|
||||||
Logout() error
|
Logout() error
|
||||||
|
|||||||
@@ -46,6 +46,7 @@ type GrpcClient struct {
|
|||||||
conn *grpc.ClientConn
|
conn *grpc.ClientConn
|
||||||
connStateCallback ConnStateNotifier
|
connStateCallback ConnStateNotifier
|
||||||
connStateCallbackLock sync.RWMutex
|
connStateCallbackLock sync.RWMutex
|
||||||
|
serverURL string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClient creates a new client to Management service
|
// NewClient creates a new client to Management service
|
||||||
@@ -75,9 +76,15 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE
|
|||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
conn: conn,
|
conn: conn,
|
||||||
connStateCallbackLock: sync.RWMutex{},
|
connStateCallbackLock: sync.RWMutex{},
|
||||||
|
serverURL: addr,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetServerURL returns the management server URL
|
||||||
|
func (c *GrpcClient) GetServerURL() string {
|
||||||
|
return c.serverURL
|
||||||
|
}
|
||||||
|
|
||||||
// Close closes connection to the Management Service
|
// Close closes connection to the Management Service
|
||||||
func (c *GrpcClient) Close() error {
|
func (c *GrpcClient) Close() error {
|
||||||
return c.conn.Close()
|
return c.conn.Close()
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ type MockClient struct {
|
|||||||
LoginFunc func(serverKey wgtypes.Key, info *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
|
LoginFunc func(serverKey wgtypes.Key, info *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
|
||||||
GetDeviceAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.DeviceAuthorizationFlow, error)
|
GetDeviceAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.DeviceAuthorizationFlow, error)
|
||||||
GetPKCEAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
|
GetPKCEAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
|
||||||
|
GetServerURLFunc func() string
|
||||||
SyncMetaFunc func(sysInfo *system.Info) error
|
SyncMetaFunc func(sysInfo *system.Info) error
|
||||||
LogoutFunc func() error
|
LogoutFunc func() error
|
||||||
JobFunc func(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
|
JobFunc func(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
|
||||||
@@ -88,6 +89,14 @@ func (m *MockClient) GetNetworkMap(_ *system.Info) (*proto.NetworkMap, error) {
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetServerURL mock implementation of GetServerURL from mgm.Client interface
|
||||||
|
func (m *MockClient) GetServerURL() string {
|
||||||
|
if m.GetServerURLFunc == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return m.GetServerURLFunc()
|
||||||
|
}
|
||||||
|
|
||||||
func (m *MockClient) SyncMeta(sysInfo *system.Info) error {
|
func (m *MockClient) SyncMeta(sysInfo *system.Info) error {
|
||||||
if m.SyncMetaFunc == nil {
|
if m.SyncMetaFunc == nil {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
Reference in New Issue
Block a user