Merge main branch into feature/client-metrics

This commit is contained in:
Zoltán Papp
2026-01-21 11:22:01 +01:00
162 changed files with 16301 additions and 2734 deletions

View File

@@ -100,8 +100,9 @@ type Conn struct {
relayManager *relayClient.Manager
srWatcher *guard.SRWatcher
onConnected func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)
onDisconnected func(remotePeer string)
onConnected func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)
onDisconnected func(remotePeer string)
rosenpassInitializedPresharedKeyValidator func(peerKey string) bool
statusRelay *worker.AtomicWorkerStatus
statusICE *worker.AtomicWorkerStatus
@@ -110,8 +111,10 @@ type Conn struct {
workerICE *WorkerICE
workerRelay *WorkerRelay
wgWatcher *WGWatcher
wgWatcherWg sync.WaitGroup
wgWatcher *WGWatcher
wgWatcherWg sync.WaitGroup
wgWatcherCancel context.CancelFunc
// used to store the remote Rosenpass key for Relayed connection in case of connection update from ice
rosenpassRemoteKey []byte
@@ -130,11 +133,8 @@ type Conn struct {
endpointUpdater *EndpointUpdater
// Connection stage timestamps for metrics
metricsRecorder MetricsRecorder
hasBeenConnected bool // Track if we've ever established a successful connection
isReconnectionAttempt bool // Track if current attempt is a reconnection
stageTimestamps metrics.ConnectionStageTimestamps
stagesMutex sync.Mutex
metricsRecorder MetricsRecorder
metricsStages MetricsStages
}
// NewConn creates a new not opened Conn to the remote peer.
@@ -163,7 +163,7 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
metricsRecorder: services.MetricsRecorder,
}
conn.wgWatcher = NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState, conn.onWGHandshakeSuccess)
conn.wgWatcher = NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState)
return conn, nil
}
@@ -172,21 +172,24 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
// be used.
func (conn *Conn) Open(engineCtx context.Context) error {
conn.mu.Lock()
if conn.opened {
conn.mu.Unlock()
return nil
}
// Record the start time - beginning of connection attempt
conn.stagesMutex.Lock()
conn.stageTimestamps.Created = time.Now()
conn.stagesMutex.Unlock()
conn.metricsStages = MetricsStages{}
conn.metricsStages.RecordCreated()
conn.mu.Unlock()
// Semaphore.Add() blocks here until there's a free slot
// todo create common semaphor logic for reconnection and connections too that can remote seats from semaphor on the fly
if err := conn.semaphore.Add(engineCtx); err != nil {
return err
}
// Record when semaphore was acquired (after the wait)
conn.stagesMutex.Lock()
conn.stageTimestamps.SemaphoreAcquired = time.Now()
conn.stagesMutex.Unlock()
conn.mu.Lock()
defer conn.mu.Unlock()
@@ -195,9 +198,12 @@ func (conn *Conn) Open(engineCtx context.Context) error {
return nil
}
// Record when semaphore was acquired (after the wait)
conn.metricsStages.RecordSemaphoreAcquired()
conn.ctx, conn.ctxCancel = context.WithCancel(engineCtx)
conn.workerRelay = NewWorkerRelay(conn.ctx, conn.Log, isController(conn.config), conn.config, conn, conn.relayManager, conn.dumpState)
conn.workerRelay = NewWorkerRelay(conn.ctx, conn.Log, isController(conn.config), conn.config, conn, conn.relayManager)
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
workerICE, err := NewWorkerICE(conn.ctx, conn.Log, conn.config, conn, conn.signaler, conn.iFaceDiscover, conn.statusRecorder, relayIsSupportedLocally)
@@ -266,7 +272,9 @@ func (conn *Conn) Close(signalToRemote bool) {
conn.Log.Infof("close peer connection")
conn.ctxCancel()
conn.wgWatcher.DisableWgWatcher()
if conn.wgWatcherCancel != nil {
conn.wgWatcherCancel()
}
conn.workerRelay.CloseConn()
conn.workerICE.Close()
@@ -296,13 +304,6 @@ func (conn *Conn) Close(signalToRemote bool) {
conn.setStatusToDisconnected()
// Reset connection metrics state
conn.stagesMutex.Lock()
conn.hasBeenConnected = false
conn.isReconnectionAttempt = false
conn.stageTimestamps = metrics.ConnectionStageTimestamps{}
conn.stagesMutex.Unlock()
conn.opened = false
conn.wg.Wait()
conn.Log.Infof("peer connection closed")
@@ -332,25 +333,17 @@ func (conn *Conn) SetOnDisconnected(handler func(remotePeer string)) {
conn.onDisconnected = handler
}
// SetRosenpassInitializedPresharedKeyValidator sets a function to check if Rosenpass has taken over
// PSK management for a peer. When this returns true, presharedKey() returns nil
// to prevent UpdatePeer from overwriting the Rosenpass-managed PSK.
func (conn *Conn) SetRosenpassInitializedPresharedKeyValidator(handler func(peerKey string) bool) {
conn.rosenpassInitializedPresharedKeyValidator = handler
}
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) {
conn.dumpState.RemoteOffer()
conn.Log.Infof("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay)
conn.stagesMutex.Lock()
// Detect reconnection: we had been connected before, but now both ICE and Relay are disconnected
if conn.hasBeenConnected && conn.evalStatus() != StatusConnected {
conn.isReconnectionAttempt = true
conn.stageTimestamps = metrics.ConnectionStageTimestamps{} // Reset for new reconnection attempt
now := time.Now()
conn.stageTimestamps.Created = now
conn.stageTimestamps.Signaling = now
conn.Log.Infof("Reconnection triggered by remote offer")
} else if conn.stageTimestamps.Signaling.IsZero() {
// First time receiving offer for this connection attempt (signaling start)
conn.stageTimestamps.Signaling = time.Now()
}
conn.stagesMutex.Unlock()
conn.handshaker.OnRemoteOffer(offer)
}
@@ -425,9 +418,6 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
ep = directEp
}
conn.wgWatcher.DisableWgWatcher()
// todo consider to run conn.wgWatcherWg.Wait() here
if conn.wgProxyRelay != nil {
conn.wgProxyRelay.Pause()
}
@@ -438,6 +428,7 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
conn.Log.Infof("configure WireGuard endpoint to: %s", ep.String())
presharedKey := conn.presharedKey(iceConnInfo.RosenpassPubKey)
conn.enableWgWatcherIfNeeded()
if err = conn.endpointUpdater.ConfigureWGEndpoint(ep, presharedKey); err != nil {
conn.handleConfigurationFailure(err, wgProxy)
return
@@ -449,12 +440,6 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
conn.wgProxyRelay.RedirectAs(ep)
}
conn.wgWatcherWg.Add(1)
go func() {
defer conn.wgWatcherWg.Done()
conn.wgWatcher.EnableWgWatcher(conn.ctx, nil)
}()
conn.currentConnPriority = priority
conn.statusICE.SetConnected()
conn.updateIceState(iceConnInfo)
@@ -488,17 +473,10 @@ func (conn *Conn) onICEStateDisconnected() {
conn.Log.Errorf("failed to switch to relay conn: %v", err)
}
conn.wgWatcher.DisableWgWatcher()
conn.wgWatcherWg.Add(1)
go func() {
defer conn.wgWatcherWg.Done()
conn.wgWatcher.EnableWgWatcher(conn.ctx, conn.onWGDisconnected)
}()
conn.wgProxyRelay.Work()
conn.currentConnPriority = conntype.Relay
} else {
conn.Log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", conntype.None.String())
conn.wgWatcher.DisableWgWatcher()
conn.currentConnPriority = conntype.None
if err := conn.config.WgConfig.WgInterface.RemoveEndpointAddress(conn.config.WgConfig.RemoteKey); err != nil {
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
@@ -511,15 +489,19 @@ func (conn *Conn) onICEStateDisconnected() {
}
conn.statusICE.SetDisconnected()
conn.disableWgWatcherIfNeeded()
if conn.currentConnPriority == conntype.None {
conn.metricsStages.Disconnected()
}
peerState := State{
PubKey: conn.config.Key,
ConnStatus: conn.evalStatus(),
Relayed: conn.isRelayed(),
ConnStatusUpdate: time.Now(),
}
err := conn.statusRecorder.UpdatePeerICEStateToDisconnected(peerState)
if err != nil {
if err := conn.statusRecorder.UpdatePeerICEStateToDisconnected(peerState); err != nil {
conn.Log.Warnf("unable to set peer's state to disconnected ice, got error: %v", err)
}
}
@@ -559,6 +541,7 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
wgProxy.Work()
presharedKey := conn.presharedKey(rci.rosenpassPubKey)
conn.enableWgWatcherIfNeeded()
if err := conn.endpointUpdater.ConfigureWGEndpoint(wgProxy.EndpointAddr(), presharedKey); err != nil {
if err := wgProxy.CloseConn(); err != nil {
conn.Log.Warnf("Failed to close relay connection: %v", err)
@@ -567,13 +550,6 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
return
}
conn.wgWatcher.DisableWgWatcher()
conn.wgWatcherWg.Add(1)
go func() {
defer conn.wgWatcherWg.Done()
conn.wgWatcher.EnableWgWatcher(conn.ctx, conn.onWGDisconnected)
}()
wgConfigWorkaround()
conn.rosenpassRemoteKey = rci.rosenpassPubKey
conn.currentConnPriority = conntype.Relay
@@ -587,7 +563,11 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
func (conn *Conn) onRelayDisconnected() {
conn.mu.Lock()
defer conn.mu.Unlock()
conn.handleRelayDisconnectedLocked()
}
// handleRelayDisconnectedLocked handles relay disconnection. Caller must hold conn.mu.
func (conn *Conn) handleRelayDisconnectedLocked() {
if conn.ctx.Err() != nil {
return
}
@@ -613,6 +593,12 @@ func (conn *Conn) onRelayDisconnected() {
}
conn.statusRelay.SetDisconnected()
conn.disableWgWatcherIfNeeded()
if conn.currentConnPriority == conntype.None {
conn.metricsStages.Disconnected()
}
peerState := State{
PubKey: conn.config.Key,
ConnStatus: conn.evalStatus(),
@@ -628,15 +614,30 @@ func (conn *Conn) onGuardEvent() {
conn.dumpState.SendOffer()
if err := conn.handshaker.SendOffer(); err != nil {
conn.Log.Errorf("failed to send offer: %v", err)
}
conn.metricsStages.RecordSignaling()
}
func (conn *Conn) onWGDisconnected() {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.ctx.Err() != nil {
return
}
// Record signaling start timestamp (first signal sent)
conn.stagesMutex.Lock()
if conn.stageTimestamps.Signaling.IsZero() {
conn.stageTimestamps.Signaling = time.Now()
conn.Log.Warnf("WireGuard handshake timeout detected, closing current connection")
// Close the active connection based on current priority
switch conn.currentConnPriority {
case conntype.Relay:
conn.workerRelay.CloseConn()
conn.handleRelayDisconnectedLocked()
case conntype.ICEP2P, conntype.ICETurn:
conn.workerICE.Close()
default:
conn.Log.Debugf("No active connection to close on WG timeout")
}
conn.stagesMutex.Unlock()
}
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte) {
@@ -701,19 +702,7 @@ func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAdd
runtime.GC()
}
// Record connection ready timestamp and mark as connected
conn.stagesMutex.Lock()
if conn.stageTimestamps.ConnectionReady.IsZero() {
conn.stageTimestamps.ConnectionReady = time.Now()
}
// Mark that we've established a connection
conn.hasBeenConnected = true
// todo: remove this when fixed the wireguard watcher
conn.stageTimestamps.WgHandshakeSuccess = time.Now()
conn.recordConnectionMetrics()
conn.stagesMutex.Unlock()
conn.metricsStages.RecordConnectionReady()
if conn.onConnected != nil {
conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.config.WgConfig.AllowedIps[0].Addr().String(), remoteRosenpassAddr)
@@ -759,10 +748,17 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) {
}
}()
if runtime.GOOS != "js" && conn.statusICE.Get() == worker.StatusDisconnected && !conn.workerICE.InProgress() {
// For JS platform: only relay connection is supported
if runtime.GOOS == "js" {
return conn.statusRelay.Get() == worker.StatusConnected
}
// For non-JS platforms: check ICE connection status
if conn.statusICE.Get() == worker.StatusDisconnected && !conn.workerICE.InProgress() {
return false
}
// If relay is supported with peer, it must also be connected
if conn.workerRelay.IsRelayConnectionSupportedWithPeer() {
if conn.statusRelay.Get() == worker.StatusDisconnected {
return false
@@ -772,6 +768,26 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) {
return true
}
func (conn *Conn) enableWgWatcherIfNeeded() {
if !conn.wgWatcher.IsEnabled() {
wgWatcherCtx, wgWatcherCancel := context.WithCancel(conn.ctx)
conn.wgWatcherCancel = wgWatcherCancel
conn.wgWatcherWg.Add(1)
now := time.Now()
go func() {
defer conn.wgWatcherWg.Done()
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, now, conn.onWGDisconnected, conn.onWGHandshakeSuccess)
}()
}
}
func (conn *Conn) disableWgWatcherIfNeeded() {
if conn.currentConnPriority == conntype.None && conn.wgWatcherCancel != nil {
conn.wgWatcherCancel()
conn.wgWatcherCancel = nil
}
}
func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) {
conn.Log.Debugf("setup proxied WireGuard connection")
udpAddr := &net.UDPAddr{
@@ -824,29 +840,17 @@ func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) {
conn.wgProxyRelay = proxy
}
// onWGDisconnected is called when the WireGuard handshake times out
func (conn *Conn) onWGDisconnected() {
conn.workerRelay.CloseConn()
conn.onRelayDisconnected()
}
// onWGHandshakeSuccess is called when the first WireGuard handshake is detected
func (conn *Conn) onWGHandshakeSuccess() {
conn.stagesMutex.Lock()
defer conn.stagesMutex.Unlock()
/*
if conn.stageTimestamps.WgHandshakeSuccess.IsZero() {
conn.stageTimestamps.WgHandshakeSuccess = time.Now()
conn.recordConnectionMetrics()
}
*/
func (conn *Conn) onWGHandshakeSuccess(when time.Time) {
conn.metricsStages.RecordWGHandshakeSuccess(when)
conn.recordConnectionMetrics()
}
// recordConnectionMetrics records connection stage timestamps as metrics
func (conn *Conn) recordConnectionMetrics() {
log.Infof("--- record Metrics")
if conn.metricsRecorder == nil {
log.Infof("--- is nil")
return
}
@@ -859,12 +863,13 @@ func (conn *Conn) recordConnectionMetrics() {
connType = metrics.ConnectionTypeICE
}
log.Infof("-- record: connType: %v, %v, %v", connType, conn.metricsStages.IsReconnection(), conn.metricsStages.GetTimestamps())
// Record metrics with timestamps - duration calculation happens in metrics package
conn.metricsRecorder.RecordConnectionStages(
context.Background(),
connType,
conn.isReconnectionAttempt,
conn.stageTimestamps,
conn.metricsStages.IsReconnection(),
conn.metricsStages.GetTimestamps(),
)
}
@@ -886,10 +891,24 @@ func (conn *Conn) presharedKey(remoteRosenpassKey []byte) *wgtypes.Key {
return conn.config.WgConfig.PreSharedKey
}
// If Rosenpass has already set a PSK for this peer, return nil to prevent
// UpdatePeer from overwriting the Rosenpass-managed key.
if conn.rosenpassInitializedPresharedKeyValidator != nil && conn.rosenpassInitializedPresharedKeyValidator(conn.config.Key) {
return nil
}
// Use NetBird PSK as the seed for Rosenpass. This same PSK is passed to
// Rosenpass as PeerConfig.PresharedKey, ensuring the derived post-quantum
// key is cryptographically bound to the original secret.
if conn.config.WgConfig.PreSharedKey != nil {
return conn.config.WgConfig.PreSharedKey
}
// Fallback to deterministic key if no NetBird PSK is configured
determKey, err := conn.rosenpassDetermKey()
if err != nil {
conn.Log.Errorf("failed to generate Rosenpass initial key: %v", err)
return conn.config.WgConfig.PreSharedKey
return nil
}
return determKey

View File

@@ -284,3 +284,27 @@ func TestConn_presharedKey(t *testing.T) {
})
}
}
func TestConn_presharedKey_RosenpassManaged(t *testing.T) {
conn := Conn{
config: ConnConfig{
Key: "LLHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
LocalKey: "RRHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
RosenpassConfig: RosenpassConfig{PubKey: []byte("dummykey")},
},
}
// When Rosenpass has already initialized the PSK for this peer,
// presharedKey must return nil to avoid UpdatePeer overwriting it.
conn.rosenpassInitializedPresharedKeyValidator = func(peerKey string) bool { return true }
if k := conn.presharedKey([]byte("remote")); k != nil {
t.Fatalf("expected nil presharedKey when Rosenpass manages PSK, got %v", k)
}
// When Rosenpass hasn't taken over yet, presharedKey should provide
// a non-nil initial key (deterministic or from NetBird PSK).
conn.rosenpassInitializedPresharedKeyValidator = func(peerKey string) bool { return false }
if k := conn.presharedKey([]byte("remote")); k == nil {
t.Fatalf("expected non-nil presharedKey before Rosenpass manages PSK")
}
}

View File

@@ -0,0 +1,88 @@
package peer
import (
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/metrics"
)
type MetricsStages struct {
isReconnectionAttempt bool // Track if current attempt is a reconnection
stageTimestamps metrics.ConnectionStageTimestamps
mu sync.Mutex
}
func (s *MetricsStages) RecordCreated() {
s.mu.Lock()
defer s.mu.Unlock()
log.Infof("--- RecordCreated")
s.stageTimestamps.Created = time.Now()
}
func (s *MetricsStages) RecordSemaphoreAcquired() {
s.mu.Lock()
defer s.mu.Unlock()
log.Infof("--- RecordSemaphoreAcquired")
s.stageTimestamps.SemaphoreAcquired = time.Now()
}
func (s *MetricsStages) RecordSignaling() {
s.mu.Lock()
defer s.mu.Unlock()
log.Infof("--- RecordSignaling")
if s.stageTimestamps.Signaling.IsZero() {
log.Infof("--- Recorded Signaling")
s.stageTimestamps.Signaling = time.Now()
}
}
func (s *MetricsStages) RecordConnectionReady() {
s.mu.Lock()
defer s.mu.Unlock()
log.Infof("--- RecordConnectionReady")
if s.stageTimestamps.ConnectionReady.IsZero() {
log.Infof("--- Recorded ConnectionReady")
s.stageTimestamps.ConnectionReady = time.Now()
}
}
func (s *MetricsStages) RecordWGHandshakeSuccess(elapsed time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
log.Infof("--- record: %v, %v", s.stageTimestamps.ConnectionReady, elapsed)
if !s.stageTimestamps.ConnectionReady.IsZero() {
// todo, check if it is earlier then ConnectionReady
s.stageTimestamps.WgHandshakeSuccess = elapsed
}
}
func (s *MetricsStages) Disconnected() {
log.Infof("--- Disconnected")
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
s.stageTimestamps = metrics.ConnectionStageTimestamps{
Created: now,
SemaphoreAcquired: now,
}
s.isReconnectionAttempt = true
}
func (s *MetricsStages) IsReconnection() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.isReconnectionAttempt
}
func (s *MetricsStages) GetTimestamps() metrics.ConnectionStageTimestamps {
s.mu.Lock()
defer s.mu.Unlock()
return s.stageTimestamps
}

View File

@@ -14,6 +14,7 @@ import (
"golang.org/x/exp/maps"
"google.golang.org/grpc/codes"
gstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
firewall "github.com/netbirdio/netbird/client/firewall/manager"
@@ -158,6 +159,7 @@ type FullStatus struct {
NSGroupStates []NSGroupState
NumOfForwardingRules int
LazyConnectionEnabled bool
Events []*proto.SystemEvent
}
type StatusChangeSubscription struct {
@@ -981,6 +983,7 @@ func (d *Status) GetFullStatus() FullStatus {
}
fullStatus.Peers = append(fullStatus.Peers, d.offlinePeers...)
fullStatus.Events = d.GetEventHistory()
return fullStatus
}
@@ -1181,3 +1184,97 @@ type EventSubscription struct {
func (s *EventSubscription) Events() <-chan *proto.SystemEvent {
return s.events
}
// ToProto converts FullStatus to proto.FullStatus.
func (fs FullStatus) ToProto() *proto.FullStatus {
pbFullStatus := proto.FullStatus{
ManagementState: &proto.ManagementState{},
SignalState: &proto.SignalState{},
LocalPeerState: &proto.LocalPeerState{},
Peers: []*proto.PeerState{},
}
pbFullStatus.ManagementState.URL = fs.ManagementState.URL
pbFullStatus.ManagementState.Connected = fs.ManagementState.Connected
if err := fs.ManagementState.Error; err != nil {
pbFullStatus.ManagementState.Error = err.Error()
}
pbFullStatus.SignalState.URL = fs.SignalState.URL
pbFullStatus.SignalState.Connected = fs.SignalState.Connected
if err := fs.SignalState.Error; err != nil {
pbFullStatus.SignalState.Error = err.Error()
}
pbFullStatus.LocalPeerState.IP = fs.LocalPeerState.IP
pbFullStatus.LocalPeerState.PubKey = fs.LocalPeerState.PubKey
pbFullStatus.LocalPeerState.KernelInterface = fs.LocalPeerState.KernelInterface
pbFullStatus.LocalPeerState.Fqdn = fs.LocalPeerState.FQDN
pbFullStatus.LocalPeerState.RosenpassPermissive = fs.RosenpassState.Permissive
pbFullStatus.LocalPeerState.RosenpassEnabled = fs.RosenpassState.Enabled
pbFullStatus.NumberOfForwardingRules = int32(fs.NumOfForwardingRules)
pbFullStatus.LazyConnectionEnabled = fs.LazyConnectionEnabled
pbFullStatus.LocalPeerState.Networks = maps.Keys(fs.LocalPeerState.Routes)
for _, peerState := range fs.Peers {
networks := maps.Keys(peerState.GetRoutes())
pbPeerState := &proto.PeerState{
IP: peerState.IP,
PubKey: peerState.PubKey,
ConnStatus: peerState.ConnStatus.String(),
ConnStatusUpdate: timestamppb.New(peerState.ConnStatusUpdate),
Relayed: peerState.Relayed,
LocalIceCandidateType: peerState.LocalIceCandidateType,
RemoteIceCandidateType: peerState.RemoteIceCandidateType,
LocalIceCandidateEndpoint: peerState.LocalIceCandidateEndpoint,
RemoteIceCandidateEndpoint: peerState.RemoteIceCandidateEndpoint,
RelayAddress: peerState.RelayServerAddress,
Fqdn: peerState.FQDN,
LastWireguardHandshake: timestamppb.New(peerState.LastWireguardHandshake),
BytesRx: peerState.BytesRx,
BytesTx: peerState.BytesTx,
RosenpassEnabled: peerState.RosenpassEnabled,
Networks: networks,
Latency: durationpb.New(peerState.Latency),
SshHostKey: peerState.SSHHostKey,
}
pbFullStatus.Peers = append(pbFullStatus.Peers, pbPeerState)
}
for _, relayState := range fs.Relays {
pbRelayState := &proto.RelayState{
URI: relayState.URI,
Available: relayState.Err == nil,
}
if err := relayState.Err; err != nil {
pbRelayState.Error = err.Error()
}
pbFullStatus.Relays = append(pbFullStatus.Relays, pbRelayState)
}
for _, dnsState := range fs.NSGroupStates {
var err string
if dnsState.Error != nil {
err = dnsState.Error.Error()
}
var servers []string
for _, server := range dnsState.Servers {
servers = append(servers, server.String())
}
pbDnsState := &proto.NSGroupState{
Servers: servers,
Domains: dnsState.Domains,
Enabled: dnsState.Enabled,
Error: err,
}
pbFullStatus.DnsServers = append(pbFullStatus.DnsServers, pbDnsState)
}
pbFullStatus.Events = fs.Events
return &pbFullStatus
}

View File

@@ -30,71 +30,57 @@ type WGWatcher struct {
peerKey string
stateDump *stateDump
ctx context.Context
ctxCancel context.CancelFunc
ctxLock sync.Mutex
enabledTime time.Time
onFirstHandshakeFn func()
enabled bool
muEnabled sync.RWMutex
}
func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string, stateDump *stateDump, onFirstHandshakeFn func()) *WGWatcher {
func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string, stateDump *stateDump) *WGWatcher {
return &WGWatcher{
log: log,
wgIfaceStater: wgIfaceStater,
peerKey: peerKey,
stateDump: stateDump,
onFirstHandshakeFn: onFirstHandshakeFn,
log: log,
wgIfaceStater: wgIfaceStater,
peerKey: peerKey,
stateDump: stateDump,
}
}
// EnableWgWatcher starts the WireGuard watcher. If it is already enabled, it will return immediately and do nothing.
func (w *WGWatcher) EnableWgWatcher(parentCtx context.Context, onDisconnectedFn func()) {
w.log.Debugf("enable WireGuard watcher")
w.ctxLock.Lock()
w.enabledTime = time.Now()
if w.ctx != nil && w.ctx.Err() == nil {
w.log.Errorf("WireGuard watcher already enabled")
w.ctxLock.Unlock()
// The watcher runs until ctx is cancelled. Caller is responsible for context lifecycle management.
func (w *WGWatcher) EnableWgWatcher(ctx context.Context, enabledTime time.Time, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time)) {
w.muEnabled.Lock()
if w.enabled {
w.muEnabled.Unlock()
return
}
ctx, ctxCancel := context.WithCancel(parentCtx)
w.ctx = ctx
w.ctxCancel = ctxCancel
w.ctxLock.Unlock()
w.log.Debugf("enable WireGuard watcher")
w.enabled = true
w.muEnabled.Unlock()
initialHandshake, err := w.wgState()
if err != nil {
w.log.Warnf("failed to read initial wg stats: %v", err)
}
w.periodicHandshakeCheck(ctx, ctxCancel, onDisconnectedFn, initialHandshake)
w.periodicHandshakeCheck(ctx, onDisconnectedFn, onHandshakeSuccessFn, enabledTime, initialHandshake)
w.muEnabled.Lock()
w.enabled = false
w.muEnabled.Unlock()
}
// DisableWgWatcher stops the WireGuard watcher and wait for the watcher to exit
func (w *WGWatcher) DisableWgWatcher() {
w.ctxLock.Lock()
defer w.ctxLock.Unlock()
if w.ctxCancel == nil {
return
}
w.log.Debugf("disable WireGuard watcher")
w.ctxCancel()
w.ctxCancel = nil
// IsEnabled returns true if the WireGuard watcher is currently enabled
func (w *WGWatcher) IsEnabled() bool {
w.muEnabled.RLock()
defer w.muEnabled.RUnlock()
return w.enabled
}
// wgStateCheck help to check the state of the WireGuard handshake and relay connection
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel context.CancelFunc, onDisconnectedFn func(), initialHandshake time.Time) {
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time), enabledTime time.Time, initialHandshake time.Time) {
w.log.Infof("WireGuard watcher started")
timer := time.NewTimer(wgHandshakeOvertime)
defer timer.Stop()
defer ctxCancel()
lastHandshake := initialHandshake
@@ -103,17 +89,13 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel contex
case <-timer.C:
handshake, ok := w.handshakeCheck(lastHandshake)
if !ok {
if onDisconnectedFn != nil {
onDisconnectedFn()
}
onDisconnectedFn()
return
}
if lastHandshake.IsZero() {
elapsed := w.calcElapsed(handshake)
w.log.Infof("first wg handshake detected within: %.2fsec, (%s)", elapsed, handshake)
if w.onFirstHandshakeFn != nil {
w.onFirstHandshakeFn()
}
elapsed := calcElapsed(enabledTime, *handshake)
w.log.Infof("--- first wg handshake detected within: %.2fsec, (%s - %s)", elapsed, enabledTime, handshake)
onHandshakeSuccessFn(*handshake)
}
lastHandshake = *handshake
@@ -175,8 +157,8 @@ func (w *WGWatcher) wgState() (time.Time, error) {
// calcElapsed calculates elapsed time since watcher was enabled.
// The watcher started after the wg configuration happens, because of this need to normalise the negative value
func (w *WGWatcher) calcElapsed(handshake *time.Time) float64 {
elapsed := handshake.Sub(w.enabledTime).Seconds()
func calcElapsed(enabledTime, handshake time.Time) float64 {
elapsed := handshake.Sub(enabledTime).Seconds()
if elapsed < 0 {
elapsed = 0
}

View File

@@ -2,6 +2,7 @@ package peer
import (
"context"
"sync"
"testing"
"time"
@@ -28,15 +29,17 @@ func TestWGWatcher_EnableWgWatcher(t *testing.T) {
mlog := log.WithField("peer", "tet")
mocWgIface := &MocWgIface{}
watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump("peer", mlog, &Status{}), nil)
watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump("peer", mlog, &Status{}))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
onDisconnected := make(chan struct{}, 1)
go watcher.EnableWgWatcher(ctx, func() {
go watcher.EnableWgWatcher(ctx,, func() {
mlog.Infof("onDisconnectedFn")
onDisconnected <- struct{}{}
}, func(elapsed float64) {
mlog.Infof("onHandshakeSuccess: %.3fs", elapsed)
})
// wait for initial reading
@@ -48,7 +51,6 @@ func TestWGWatcher_EnableWgWatcher(t *testing.T) {
case <-time.After(10 * time.Second):
t.Errorf("timeout")
}
watcher.DisableWgWatcher()
}
func TestWGWatcher_ReEnable(t *testing.T) {
@@ -57,20 +59,27 @@ func TestWGWatcher_ReEnable(t *testing.T) {
mlog := log.WithField("peer", "tet")
mocWgIface := &MocWgIface{}
watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump("peer", mlog, &Status{}), nil)
watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump("peer", mlog, &Status{}))
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
watcher.EnableWgWatcher(ctx, func() {}, func(elapsed float64) {})
}()
cancel()
wg.Wait()
// Re-enable with a new context
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
onDisconnected := make(chan struct{}, 1)
go watcher.EnableWgWatcher(ctx, func() {})
time.Sleep(1 * time.Second)
watcher.DisableWgWatcher()
go watcher.EnableWgWatcher(ctx, func() {
go watcher.EnableWgWatcher(ctx,, func() {
onDisconnected <- struct{}{}
})
}, func(elapsed float64) {})
time.Sleep(2 * time.Second)
mocWgIface.disconnect()
@@ -80,5 +89,4 @@ func TestWGWatcher_ReEnable(t *testing.T) {
case <-time.After(10 * time.Second):
t.Errorf("timeout")
}
watcher.DisableWgWatcher()
}

View File

@@ -32,7 +32,7 @@ type WorkerRelay struct {
relaySupportedOnRemotePeer atomic.Bool
}
func NewWorkerRelay(ctx context.Context, log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager *relayClient.Manager, stateDump *stateDump) *WorkerRelay {
func NewWorkerRelay(ctx context.Context, log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager *relayClient.Manager) *WorkerRelay {
r := &WorkerRelay{
peerCtx: ctx,
log: log,