Compare commits

..

13 Commits

Author SHA1 Message Date
Zoltán Papp
e3c66ced13 Fix defer 2026-06-25 14:36:47 +02:00
Zoltán Papp
94284d5400 Add status profiles 2026-06-25 13:57:14 +02:00
Zoltán Papp
fa91c119c3 Remove debug lines from relay 2026-06-25 13:56:28 +02:00
Zoltán Papp
b5d2f054c2 print stack 2026-06-25 13:38:40 +02:00
Zoltán Papp
50ff095d68 Measure lock times: wgIface, relayMgr, ingressGwMgr, eventStreams 2026-06-25 13:34:50 +02:00
Zoltán Papp
0afe52cfeb Print serial 2026-06-25 13:12:45 +02:00
Zoltán Papp
31c277b1df peer/status: move relay-state reads off the main mux
GetRelayStates held d.mux (RLock) while calling into the relay
Manager (RelayStates/RelayConnectError/ServerURLs). Those calls can be
slow or block on the relay manager's own locks while it is reconnecting,
which kept the central Status mutex held and stalled every peer state
writer (UpdatePeerState, ReplaceOfflinePeers, etc.) contending for it.

Guard relayMgr/relayStates with a dedicated muxRelays mutex and release
it before invoking the relay Manager, so the relay read path no longer
contends with the hot peer-state writers on d.mux.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-25 13:02:58 +02:00
Zoltán Papp
566d21c2c3 Set logger in ConnectedIP test to avoid nil deref 2026-06-25 10:15:07 +02:00
Zoltán Papp
858e2d1c34 Add logs 2026-06-24 21:46:14 +02:00
Zoltán Papp
35ed69bfe7 Revert "[client] Drop signaling-side ICE candidate filter, drop overlay STUN at mux read-side instead (#6142)"
This reverts commit b57f714350.

Restores the signaling-side ICE candidate filter and the engine/worker_ice
STUN gating that #6142 removed, re-adding the mux read-side changes in
udpmux/universal.go to their pre-#6142 form.
2026-06-24 21:29:05 +02:00
mlsmaycon
8446713d28 temp timing logs 2026-06-24 15:35:58 +02:00
Viktor Liu
12f2e69af2 Log signal stall, ICE pair selection, restart cadence, sync content, and receive backpressure to attribute the regression 2026-06-24 15:04:22 +02:00
Viktor Liu
4cb2c62f2a Keep signal stream alive while receive loop is blocked on worker handoff 2026-06-24 12:44:04 +02:00
23 changed files with 567 additions and 56 deletions

View File

@@ -33,7 +33,7 @@
<br/>
<br/>
<strong>
🚀 <a href="https://netbird.io/careers">We are hiring! Join us at https://netbird.io/careers</a>
🚀 <a href="https://careers.netbird.io">We are hiring! Join us at careers.netbird.io</a>
</strong>
</p>

View File

@@ -41,6 +41,7 @@ type ICEBind struct {
*wgConn.StdNetBind
transportNet transport.Net
filterFn udpmux.FilterFn
address wgaddr.Address
mtu uint16
@@ -60,11 +61,12 @@ type ICEBind struct {
ipv6Conn *net.UDPConn
}
func NewICEBind(transportNet transport.Net, address wgaddr.Address, mtu uint16) *ICEBind {
func NewICEBind(transportNet transport.Net, filterFn udpmux.FilterFn, address wgaddr.Address, mtu uint16) *ICEBind {
b, _ := wgConn.NewStdNetBind().(*wgConn.StdNetBind)
ib := &ICEBind{
StdNetBind: b,
transportNet: transportNet,
filterFn: filterFn,
address: address,
mtu: mtu,
endpoints: make(map[netip.Addr]net.Conn),
@@ -263,6 +265,7 @@ func (s *ICEBind) createOrUpdateMux() {
udpmux.UniversalUDPMuxParams{
UDPConn: muxConn,
Net: s.transportNet,
FilterFn: s.filterFn,
WGAddress: s.address,
MTU: s.mtu,
},

View File

@@ -289,7 +289,7 @@ func setupICEBind(t *testing.T) *ICEBind {
IP: netip.MustParseAddr("100.64.0.1"),
Network: netip.MustParsePrefix("100.64.0.0/10"),
}
return NewICEBind(transportNet, address, 1280)
return NewICEBind(transportNet, nil, address, 1280)
}
func createDualStackConns(t *testing.T) (*net.UDPConn, *net.UDPConn) {

View File

@@ -32,6 +32,8 @@ type TunKernelDevice struct {
link *wgLink
udpMuxConn net.PacketConn
udpMux *udpmux.UniversalUDPMuxDefault
filterFn udpmux.FilterFn
}
func NewKernelDevice(name string, address wgaddr.Address, wgPort int, key string, mtu uint16, transportNet transport.Net) *TunKernelDevice {
@@ -102,6 +104,7 @@ func (t *TunKernelDevice) Up() (*udpmux.UniversalUDPMuxDefault, error) {
bindParams := udpmux.UniversalUDPMuxParams{
UDPConn: nbnet.WrapPacketConn(rawSock),
Net: t.transportNet,
FilterFn: t.filterFn,
WGAddress: t.address,
MTU: t.mtu,
}

View File

@@ -63,6 +63,7 @@ type WGIFaceOpts struct {
MTU uint16
MobileArgs *device.MobileIFaceArguments
TransportNet transport.Net
FilterFn udpmux.FilterFn
DisableDNS bool
}

View File

@@ -11,7 +11,7 @@ import (
// NewWGIFace Creates a new WireGuard interface instance
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
var tun WGTunDevice
if netstack.IsEnabled() {

View File

@@ -9,7 +9,7 @@ import (
// NewWGIFace Creates a new WireGuard interface instance
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
if netstack.IsEnabled() {
wgIFace := &WGIface{

View File

@@ -10,7 +10,7 @@ import (
// NewWGIFace Creates a new WireGuard interface instance
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
wgIFace := &WGIface{
tun: device.NewTunDevice(opts.IFaceName, opts.Address, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind, opts.MobileArgs.TunFd),

View File

@@ -14,7 +14,7 @@ import (
// NewWGIFace Creates a new WireGuard interface instance
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
if netstack.IsEnabled() {
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
return &WGIface{
tun: device.NewNetstackDevice(opts.IFaceName, opts.Address, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind, netstack.ListenAddr()),
userspaceBind: true,
@@ -30,7 +30,7 @@ func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
}
if device.ModuleTunIsLoaded() {
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
return &WGIface{
tun: device.NewTunDevice(opts.IFaceName, opts.Address, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind),
userspaceBind: true,

View File

@@ -8,6 +8,8 @@ import (
"context"
"fmt"
"net"
"net/netip"
"sync"
"time"
log "github.com/sirupsen/logrus"
@@ -20,6 +22,10 @@ import (
"github.com/netbirdio/netbird/client/iface/wgaddr"
)
// FilterFn is a function that filters out candidates based on the address.
// If it returns true, the address is to be filtered. It also returns the prefix of matching route.
type FilterFn func(address netip.Addr) (bool, netip.Prefix, error)
// UniversalUDPMuxDefault handles STUN and TURN servers packets by wrapping the original UDPConn
// It then passes packets to the UDPMux that does the actual connection muxing.
type UniversalUDPMuxDefault struct {
@@ -37,6 +43,7 @@ type UniversalUDPMuxParams struct {
UDPConn net.PacketConn
XORMappedAddrCacheTTL time.Duration
Net transport.Net
FilterFn FilterFn
WGAddress wgaddr.Address
MTU uint16
}
@@ -61,6 +68,7 @@ func NewUniversalUDPMuxDefault(params UniversalUDPMuxParams) *UniversalUDPMuxDef
PacketConn: params.UDPConn,
mux: m,
logger: params.Logger,
filterFn: params.FilterFn,
address: params.WGAddress,
}
@@ -107,12 +115,15 @@ func (m *UniversalUDPMuxDefault) ReadFromConn(ctx context.Context) {
}
}
// UDPConn is a wrapper around UDPMux conn that overrides WriteTo to drop packets destined for the overlay subnet.
// UDPConn is a wrapper around UDPMux conn that overrides ReadFrom and handles STUN/TURN packets
type UDPConn struct {
net.PacketConn
mux *UniversalUDPMuxDefault
logger logging.LeveledLogger
address wgaddr.Address
mux *UniversalUDPMuxDefault
logger logging.LeveledLogger
filterFn FilterFn
// TODO: reset cache on route changes
addrCache sync.Map
address wgaddr.Address
}
// GetPacketConn returns the underlying PacketConn
@@ -121,18 +132,67 @@ func (u *UDPConn) GetPacketConn() net.PacketConn {
}
func (u *UDPConn) WriteTo(b []byte, addr net.Addr) (int, error) {
udpAddr, ok := addr.(*net.UDPAddr)
if !ok {
if u.filterFn == nil {
return u.PacketConn.WriteTo(b, addr)
}
dst := udpAddr.AddrPort().Addr().Unmap()
if (u.address.Network.IsValid() && u.address.Network.Contains(dst)) || (u.address.IPv6Net.IsValid() && u.address.IPv6Net.Contains(dst)) {
log.Warnf("address %s is part of the NetBird network %s, refusing to write", addr, u.address)
return 0, fmt.Errorf("address %s is part of the NetBird network %s, refusing to write", addr, u.address)
if isRouted, found := u.addrCache.Load(addr.String()); found {
return u.handleCachedAddress(isRouted.(bool), b, addr)
}
return u.handleUncachedAddress(b, addr)
}
func (u *UDPConn) handleCachedAddress(isRouted bool, b []byte, addr net.Addr) (int, error) {
if isRouted {
return 0, fmt.Errorf("address %s is part of a routed network, refusing to write", addr)
}
return u.PacketConn.WriteTo(b, addr)
}
func (u *UDPConn) handleUncachedAddress(b []byte, addr net.Addr) (int, error) {
if err := u.performFilterCheck(addr); err != nil {
return 0, err
}
return u.PacketConn.WriteTo(b, addr)
}
func (u *UDPConn) performFilterCheck(addr net.Addr) error {
host, err := getHostFromAddr(addr)
if err != nil {
log.Errorf("Failed to get host from address %s: %v", addr, err)
return nil
}
a, err := netip.ParseAddr(host)
if err != nil {
log.Errorf("Failed to parse address %s: %v", addr, err)
return nil
}
if u.address.Network.Contains(a) {
log.Warnf("address %s is part of the NetBird network %s, refusing to write", addr, u.address)
return fmt.Errorf("address %s is part of the NetBird network %s, refusing to write", addr, u.address)
}
if isRouted, prefix, err := u.filterFn(a); err != nil {
log.Errorf("Failed to check if address %s is routed: %v", addr, err)
} else {
u.addrCache.Store(addr.String(), isRouted)
if isRouted {
// Extra log, as the error only shows up with ICE logging enabled
log.Infof("address %s is part of routed network %s, refusing to write", addr, prefix)
return fmt.Errorf("address %s is part of routed network %s, refusing to write", addr, prefix)
}
}
return nil
}
func getHostFromAddr(addr net.Addr) (string, error) {
host, _, err := net.SplitHostPort(addr.String())
return host, err
}
// GetSharedConn returns the shared udp conn
func (m *UniversalUDPMuxDefault) GetSharedConn() net.PacketConn {
return m.params.UDPConn
@@ -165,13 +225,6 @@ func (m *UniversalUDPMuxDefault) HandleSTUNMessage(msg *stun.Message, addr net.A
return nil
}
src := udpAddr.AddrPort().Addr().Unmap()
wg := m.params.WGAddress
if (wg.Network.IsValid() && wg.Network.Contains(src)) || (wg.IPv6Net.IsValid() && wg.IPv6Net.Contains(src)) {
log.Debugf("dropping STUN message from overlay source %s", udpAddr)
return nil
}
if m.isXORMappedResponse(msg, udpAddr.String()) {
err := m.handleXORMappedResponse(udpAddr, msg)
if err != nil {

View File

@@ -66,7 +66,7 @@ func seedProxyForProxyCloseByRemoteConn() ([]proxyInstance, error) {
if err != nil {
return nil, err
}
iceBind := bind.NewICEBind(nil, wgAddress, 1280)
iceBind := bind.NewICEBind(nil, nil, wgAddress, 1280)
endpointAddress := &net.UDPAddr{
IP: net.IPv4(10, 0, 0, 1),
Port: 1234,

View File

@@ -22,7 +22,7 @@ func seedProxyForProxyCloseByRemoteConn() ([]proxyInstance, error) {
if err != nil {
return nil, err
}
iceBind := bind.NewICEBind(nil, wgAddress, 1280)
iceBind := bind.NewICEBind(nil, nil, wgAddress, 1280)
endpointAddress := &net.UDPAddr{
IP: net.IPv4(10, 0, 0, 1),
Port: 1234,

View File

@@ -14,6 +14,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-multierror"
@@ -53,6 +54,7 @@ import (
"github.com/netbirdio/netbird/client/internal/relay"
"github.com/netbirdio/netbird/client/internal/rosenpass"
"github.com/netbirdio/netbird/client/internal/routemanager"
"github.com/netbirdio/netbird/client/internal/routemanager/systemops"
"github.com/netbirdio/netbird/client/internal/statemanager"
"github.com/netbirdio/netbird/client/internal/syncstore"
"github.com/netbirdio/netbird/client/internal/updater"
@@ -88,6 +90,13 @@ var ErrResetConnection = fmt.Errorf("reset connection")
var ErrEngineAlreadyStarted = errors.New("engine already started")
// engineRestartCount and engineLastRestart track client-restart cadence across
// engine recreations so a restart loop is distinguishable from rare restarts.
var (
engineRestartCount atomic.Int64
engineLastRestart atomic.Int64
)
type EngineConfig struct {
WgPort int
WgIfaceName string
@@ -899,7 +908,11 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
started := time.Now()
defer func() {
duration := time.Since(started)
log.Infof("sync finished in %s", duration)
if update.GetNetworkMap() != nil {
log.Infof("sync finished in %s, %d", duration, update.GetNetworkMap().GetSerial())
} else {
log.Infof("sync finished in %s", duration)
}
e.clientMetrics.RecordSyncDuration(e.ctx, duration)
}()
e.syncMsgMux.Lock()
@@ -909,14 +922,23 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
if e.ctx.Err() != nil {
return e.ctx.Err()
}
serial := update.GetNetworkMap().GetSerial()
if nm := update.GetNetworkMap(); nm != nil {
log.Infof("sync update: serial=%d remotePeers=%d offlinePeers=%d routes=%d firewallRules=%d checks=%d configPresent=%v remotePeersEmpty=%v",
nm.GetSerial(), len(nm.GetRemotePeers()), len(nm.GetOfflinePeers()), len(nm.GetRoutes()),
len(nm.GetFirewallRules()), len(update.GetChecks()), update.GetNetbirdConfig() != nil, nm.GetRemotePeersIsEmpty())
} else {
log.Infof("sync update: config-only (no network map), configPresent=%v", update.GetNetbirdConfig() != nil)
}
if update.NetworkMap != nil && update.NetworkMap.PeerConfig != nil {
e.handleAutoUpdateVersion(update.NetworkMap.PeerConfig.AutoUpdate)
}
startTime := time.Now()
if err := e.updateNetbirdConfig(update.GetNetbirdConfig()); err != nil {
return err
}
log.Infof("netbird config updated in %s, serial=%d", time.Since(startTime), serial)
// Posture checks are bound to the network map presence:
// NetworkMap != nil, checks present -> apply the received checks
@@ -927,17 +949,21 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
if nm == nil {
return nil
}
startTime = time.Now()
if err := e.updateChecksIfNew(update.Checks); err != nil {
return err
}
log.Infof("checks updated in %s, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
e.persistSyncResponse(update)
log.Infof("sync response persisted in %s, serial=%d", time.Since(startTime), serial)
// only apply new changes and ignore old ones
startTime = time.Now()
if err := e.updateNetworkMap(nm); err != nil {
return err
}
log.Infof("network map updated in %s, serial=%d", time.Since(startTime), serial)
e.statusRecorder.PublishEvent(cProto.SystemEvent_INFO, cProto.SystemEvent_SYSTEM, "Network map updated", "", nil)
@@ -1357,44 +1383,56 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
dnsConfig := toDNSConfig(protoDNSConfig, e.wgInterface.Address())
startTime := time.Now()
if err := e.dnsServer.UpdateDNSServer(serial, dnsConfig); err != nil {
log.Errorf("failed to update dns server, err: %v", err)
}
log.Infof("updated dns server in %v, serial=%d", time.Since(startTime), serial)
e.routeManager.SetDNSForwarderPort(dnsConfig.ForwarderPort)
// apply routes first, route related actions might depend on routing being enabled
startTime = time.Now()
routes := toRoutes(networkMap.GetRoutes())
serverRoutes, clientRoutes := e.routeManager.ClassifyRoutes(routes)
log.Infof("updated routes in %v, serial=%d", time.Since(startTime), serial)
// lazy mgr needs to be aware of which routes are available before they are applied
if e.connMgr != nil {
e.connMgr.UpdateRouteHAMap(clientRoutes)
log.Debugf("updated lazy connection manager with %d HA groups", len(clientRoutes))
}
startTime = time.Now()
dnsRouteFeatureFlag := toDNSFeatureFlag(networkMap)
if err := e.routeManager.UpdateRoutes(serial, serverRoutes, clientRoutes, dnsRouteFeatureFlag); err != nil {
log.Errorf("failed to update routes: %v", err)
}
log.Infof("updated routes in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
if e.acl != nil {
e.acl.ApplyFiltering(networkMap, dnsRouteFeatureFlag)
}
log.Infof("updated filtering in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
fwdEntries := toRouteDomains(e.config.WgPrivateKey.PublicKey().String(), routes)
e.updateDNSForwarder(dnsRouteFeatureFlag, fwdEntries)
log.Infof("updated DNS forwarder in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
// Ingress forward rules
forwardingRules, err := e.updateForwardRules(networkMap.GetForwardingRules())
if err != nil {
log.Errorf("failed to update forward rules, err: %v", err)
}
log.Infof("updated forward rules in %v, serial=%d", time.Since(startTime), serial)
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))
startTime = time.Now()
e.updateOfflinePeers(networkMap.GetOfflinePeers())
log.Infof("updated offline peers in %v, serial=%d", time.Since(startTime), serial)
// Filter out own peer from the remote peers list
localPubKey := e.config.WgPrivateKey.PublicKey().String()
remotePeers := make([]*mgmProto.RemotePeerConfig, 0, len(networkMap.GetRemotePeers()))
@@ -1412,20 +1450,24 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
return err
}
} else {
startTime = time.Now()
err := e.removePeers(remotePeers)
if err != nil {
return err
}
log.Infof("removed peers in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
err = e.modifyPeers(remotePeers)
if err != nil {
return err
}
log.Infof("modified peers in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
err = e.addNewPeers(remotePeers)
if err != nil {
return err
}
log.Infof("added peers in %v, serial=%d", time.Since(startTime), serial)
e.statusRecorder.FinishPeerListModifications()
@@ -1438,9 +1480,11 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
e.updateSSHServerAuth(networkMap.GetSshAuth())
}
startTime = time.Now()
// must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store
excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, remotePeers)
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
log.Infof("updated lazy connection manager exclude list in %v, serial=%d", time.Since(startTime), serial)
e.networkSerial = serial
@@ -1955,6 +1999,7 @@ func (e *Engine) newWgIface() (*iface.WGIface, error) {
WGPrivKey: e.config.WgPrivateKey.String(),
MTU: e.config.MTU,
TransportNet: transportNet,
FilterFn: e.addrViaRoutes,
DisableDNS: e.config.DisableDNS,
}
@@ -2171,7 +2216,14 @@ func (e *Engine) triggerClientRestart() {
return
}
log.Info("restarting engine")
// Cadence survives engine recreation (package-level), so a restart loop shows
// as a fast-climbing count with a short gap, distinct from rare intentional restarts.
n := engineRestartCount.Add(1)
var sinceLast time.Duration
if prev := engineLastRestart.Swap(time.Now().UnixNano()); prev != 0 {
sinceLast = time.Since(time.Unix(0, prev))
}
log.Infof("restarting engine (restart #%d, %s since previous)", n, sinceLast.Round(time.Second))
CtxGetState(e.ctx).Set(StatusConnecting)
_ = CtxGetState(e.ctx).Wrap(ErrResetConnection)
log.Infof("cancelling client context, engine will be recreated")
@@ -2202,6 +2254,21 @@ func (e *Engine) startNetworkMonitor() {
}()
}
func (e *Engine) addrViaRoutes(addr netip.Addr) (bool, netip.Prefix, error) {
var vpnRoutes []netip.Prefix
for _, routes := range e.routeManager.GetClientRoutes() {
if len(routes) > 0 && routes[0] != nil {
vpnRoutes = append(vpnRoutes, routes[0].Network)
}
}
if isVpn, prefix := systemops.IsAddrRouted(addr, vpnRoutes); isVpn {
return true, prefix, nil
}
return false, netip.Prefix{}, nil
}
func (e *Engine) stopDNSServer() {
if e.dnsServer == nil {
return

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/netip"
"runtime/debug"
"slices"
"sync"
"time"
@@ -192,6 +193,7 @@ func (s *StatusChangeSubscription) Events() chan map[string]RouterState {
// Pure read methods take RLock; anything that mutates state takes Lock.
type Status struct {
mux sync.RWMutex
muxRelays sync.RWMutex
peers map[string]State
ipToKey map[string]string
changeNotify map[string]map[string]*StatusChangeSubscription // map[peerID]map[subscriptionID]*StatusChangeSubscription
@@ -226,6 +228,8 @@ type Status struct {
routeIDLookup routeIDLookup
wgIface WGIfaceStatus
profile *StatusProfile
}
// NewRecorder returns a new Status instance
@@ -240,16 +244,23 @@ func NewRecorder(mgmAddress string) *Status {
notifier: newNotifier(),
mgmAddress: mgmAddress,
resolvedDomainsStates: map[domain.Domain]ResolvedDomainInfo{},
profile: NewStatusProfile(context.Background()),
}
}
func (d *Status) StartProfile(ctx context.Context) {
d.profile.Start(ctx)
}
func (d *Status) SetRelayMgr(manager *relayClient.Manager) {
d.mux.Lock()
defer d.mux.Unlock()
d.profile.inc("SetRelayMgr")
d.muxRelays.Lock()
defer d.muxRelays.Unlock()
d.relayMgr = manager
}
func (d *Status) SetIngressGwMgr(ingressGwMgr *ingressgw.Manager) {
d.profile.inc("SetIngressGwMgr")
d.mux.Lock()
defer d.mux.Unlock()
d.ingressGwMgr = ingressGwMgr
@@ -257,6 +268,7 @@ func (d *Status) SetIngressGwMgr(ingressGwMgr *ingressgw.Manager) {
// ReplaceOfflinePeers replaces
func (d *Status) ReplaceOfflinePeers(replacement []State) {
d.profile.inc("ReplaceOfflinePeers")
d.mux.Lock()
defer d.mux.Unlock()
d.offlinePeers = make([]State, len(replacement))
@@ -268,6 +280,7 @@ func (d *Status) ReplaceOfflinePeers(replacement []State) {
// AddPeer adds peer to Daemon status map
func (d *Status) AddPeer(peerPubKey string, fqdn string, ip string, ipv6 string) error {
d.profile.inc("AddPeer")
d.mux.Lock()
defer d.mux.Unlock()
@@ -295,6 +308,7 @@ func (d *Status) AddPeer(peerPubKey string, fqdn string, ip string, ipv6 string)
// GetPeer adds peer to Daemon status map
func (d *Status) GetPeer(peerPubKey string) (State, error) {
d.profile.inc("GetPeer")
d.mux.RLock()
defer d.mux.RUnlock()
@@ -306,6 +320,7 @@ func (d *Status) GetPeer(peerPubKey string) (State, error) {
}
func (d *Status) PeerByIP(ip string) (string, bool) {
d.profile.inc("PeerByIP")
d.mux.RLock()
defer d.mux.RUnlock()
@@ -323,6 +338,7 @@ func (d *Status) PeerByIP(ip string) (string, bool) {
// active peers are matched; peers moved into the offline slice by
// ReplaceOfflinePeers are intentionally treated as unknown.
func (d *Status) PeerStateByIP(ip string) (State, bool) {
d.profile.inc("PeerStateByIP")
if ip == "" {
return State{}, false
}
@@ -341,6 +357,7 @@ func (d *Status) PeerStateByIP(ip string) (State, bool) {
// RemovePeer removes peer from Daemon status map
func (d *Status) RemovePeer(peerPubKey string) error {
d.profile.inc("RemovePeer")
d.mux.Lock()
defer d.mux.Unlock()
@@ -362,6 +379,7 @@ func (d *Status) RemovePeer(peerPubKey string) error {
// UpdatePeerState updates peer status
func (d *Status) UpdatePeerState(receivedState State) error {
d.profile.inc("UpdatePeerState")
d.mux.Lock()
peerState, ok := d.peers[receivedState.PubKey]
@@ -404,6 +422,7 @@ func (d *Status) UpdatePeerState(receivedState State) error {
}
func (d *Status) AddPeerStateRoute(peer string, route string, resourceId route.ResID) error {
d.profile.inc("AddPeerStateRoute")
d.mux.Lock()
peerState, ok := d.peers[peer]
@@ -429,6 +448,7 @@ func (d *Status) AddPeerStateRoute(peer string, route string, resourceId route.R
}
func (d *Status) RemovePeerStateRoute(peer string, route string) error {
d.profile.inc("RemovePeerStateRoute")
d.mux.Lock()
peerState, ok := d.peers[peer]
@@ -459,11 +479,13 @@ func (d *Status) CheckRoutes(ip netip.Addr) ([]byte, bool) {
if d == nil {
return nil, false
}
d.profile.inc("CheckRoutes")
resId, isExitNode := d.routeIDLookup.Lookup(ip)
return []byte(resId), isExitNode
}
func (d *Status) UpdatePeerICEState(receivedState State) error {
d.profile.inc("UpdatePeerICEState")
d.mux.Lock()
peerState, ok := d.peers[receivedState.PubKey]
@@ -503,6 +525,7 @@ func (d *Status) UpdatePeerICEState(receivedState State) error {
}
func (d *Status) UpdatePeerRelayedState(receivedState State) error {
d.profile.inc("UpdatePeerRelayedState")
d.mux.Lock()
peerState, ok := d.peers[receivedState.PubKey]
@@ -539,6 +562,7 @@ func (d *Status) UpdatePeerRelayedState(receivedState State) error {
}
func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error {
d.profile.inc("UpdatePeerRelayedStateToDisconnected")
d.mux.Lock()
peerState, ok := d.peers[receivedState.PubKey]
@@ -574,6 +598,7 @@ func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error
}
func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error {
d.profile.inc("UpdatePeerICEStateToDisconnected")
d.mux.Lock()
peerState, ok := d.peers[receivedState.PubKey]
@@ -613,6 +638,7 @@ func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error {
// UpdateWireGuardPeerState updates the WireGuard bits of the peer state
func (d *Status) UpdateWireGuardPeerState(pubKey string, wgStats configurer.WGStats) error {
d.profile.inc("UpdateWireGuardPeerState")
d.mux.Lock()
defer d.mux.Unlock()
@@ -640,6 +666,7 @@ func hasConnStatusChanged(oldStatus, newStatus ConnStatus) bool {
// UpdatePeerFQDN update peer's state fqdn only
func (d *Status) UpdatePeerFQDN(peerPubKey, fqdn string) error {
d.profile.inc("UpdatePeerFQDN")
d.mux.Lock()
defer d.mux.Unlock()
@@ -656,6 +683,7 @@ func (d *Status) UpdatePeerFQDN(peerPubKey, fqdn string) error {
// UpdatePeerSSHHostKey updates peer's SSH host key
func (d *Status) UpdatePeerSSHHostKey(peerPubKey string, sshHostKey []byte) error {
d.profile.inc("UpdatePeerSSHHostKey")
d.mux.Lock()
defer d.mux.Unlock()
@@ -672,6 +700,7 @@ func (d *Status) UpdatePeerSSHHostKey(peerPubKey string, sshHostKey []byte) erro
// FinishPeerListModifications this event invoke the notification
func (d *Status) FinishPeerListModifications() {
d.profile.inc("FinishPeerListModifications")
d.mux.Lock()
if !d.peerListChangedForNotification {
@@ -704,6 +733,7 @@ func (d *Status) FinishPeerListModifications() {
}
func (d *Status) SubscribeToPeerStateChanges(ctx context.Context, peerID string) *StatusChangeSubscription {
d.profile.inc("SubscribeToPeerStateChanges")
d.mux.Lock()
defer d.mux.Unlock()
@@ -717,6 +747,7 @@ func (d *Status) SubscribeToPeerStateChanges(ctx context.Context, peerID string)
}
func (d *Status) UnsubscribePeerStateChanges(subscription *StatusChangeSubscription) {
d.profile.inc("UnsubscribePeerStateChanges")
d.mux.Lock()
defer d.mux.Unlock()
@@ -742,6 +773,7 @@ func (d *Status) UnsubscribePeerStateChanges(subscription *StatusChangeSubscript
// GetLocalPeerState returns the local peer state
func (d *Status) GetLocalPeerState() LocalPeerState {
d.profile.inc("GetLocalPeerState")
d.mux.RLock()
defer d.mux.RUnlock()
return d.localPeer.Clone()
@@ -749,6 +781,7 @@ func (d *Status) GetLocalPeerState() LocalPeerState {
// UpdateLocalPeerState updates local peer status
func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) {
d.profile.inc("UpdateLocalPeerState")
d.mux.Lock()
d.localPeer = localPeerState
fqdn := d.localPeer.FQDN
@@ -763,6 +796,7 @@ func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) {
// AddLocalPeerStateRoute adds a route to the local peer state
func (d *Status) AddLocalPeerStateRoute(route string, resourceId route.ResID) {
d.profile.inc("AddLocalPeerStateRoute")
d.mux.Lock()
defer d.mux.Unlock()
@@ -780,6 +814,7 @@ func (d *Status) AddLocalPeerStateRoute(route string, resourceId route.ResID) {
// RemoveLocalPeerStateRoute removes a route from the local peer state
func (d *Status) RemoveLocalPeerStateRoute(route string) {
d.profile.inc("RemoveLocalPeerStateRoute")
d.mux.Lock()
defer d.mux.Unlock()
@@ -793,6 +828,7 @@ func (d *Status) RemoveLocalPeerStateRoute(route string) {
// AddResolvedIPLookupEntry adds a resolved IP lookup entry
func (d *Status) AddResolvedIPLookupEntry(prefix netip.Prefix, resourceId route.ResID) {
d.profile.inc("AddResolvedIPLookupEntry")
d.mux.Lock()
defer d.mux.Unlock()
@@ -801,6 +837,7 @@ func (d *Status) AddResolvedIPLookupEntry(prefix netip.Prefix, resourceId route.
// RemoveResolvedIPLookupEntry removes a resolved IP lookup entry
func (d *Status) RemoveResolvedIPLookupEntry(route string) {
d.profile.inc("RemoveResolvedIPLookupEntry")
d.mux.Lock()
defer d.mux.Unlock()
@@ -812,6 +849,7 @@ func (d *Status) RemoveResolvedIPLookupEntry(route string) {
// CleanLocalPeerStateRoutes cleans all routes from the local peer state
func (d *Status) CleanLocalPeerStateRoutes() {
d.profile.inc("CleanLocalPeerStateRoutes")
d.mux.Lock()
defer d.mux.Unlock()
@@ -820,6 +858,7 @@ func (d *Status) CleanLocalPeerStateRoutes() {
// CleanLocalPeerState cleans local peer status
func (d *Status) CleanLocalPeerState() {
d.profile.inc("CleanLocalPeerState")
d.mux.Lock()
d.localPeer = LocalPeerState{}
fqdn := d.localPeer.FQDN
@@ -831,6 +870,7 @@ func (d *Status) CleanLocalPeerState() {
// MarkManagementDisconnected sets ManagementState to disconnected
func (d *Status) MarkManagementDisconnected(err error) {
d.profile.inc("MarkManagementDisconnected")
d.mux.Lock()
d.managementState = false
d.managementError = err
@@ -843,6 +883,7 @@ func (d *Status) MarkManagementDisconnected(err error) {
// MarkManagementConnected sets ManagementState to connected
func (d *Status) MarkManagementConnected() {
d.profile.inc("MarkManagementConnected")
d.mux.Lock()
d.managementState = true
d.managementError = nil
@@ -855,6 +896,7 @@ func (d *Status) MarkManagementConnected() {
// UpdateSignalAddress update the address of the signal server
func (d *Status) UpdateSignalAddress(signalURL string) {
d.profile.inc("UpdateSignalAddress")
d.mux.Lock()
defer d.mux.Unlock()
d.signalAddress = signalURL
@@ -862,6 +904,7 @@ func (d *Status) UpdateSignalAddress(signalURL string) {
// UpdateManagementAddress update the address of the management server
func (d *Status) UpdateManagementAddress(mgmAddress string) {
d.profile.inc("UpdateManagementAddress")
d.mux.Lock()
defer d.mux.Unlock()
d.mgmAddress = mgmAddress
@@ -869,6 +912,7 @@ func (d *Status) UpdateManagementAddress(mgmAddress string) {
// UpdateRosenpass update the Rosenpass configuration
func (d *Status) UpdateRosenpass(rosenpassEnabled, rosenpassPermissive bool) {
d.profile.inc("UpdateRosenpass")
d.mux.Lock()
defer d.mux.Unlock()
d.rosenpassPermissive = rosenpassPermissive
@@ -876,6 +920,7 @@ func (d *Status) UpdateRosenpass(rosenpassEnabled, rosenpassPermissive bool) {
}
func (d *Status) UpdateLazyConnection(enabled bool) {
d.profile.inc("UpdateLazyConnection")
d.mux.Lock()
defer d.mux.Unlock()
d.lazyConnectionEnabled = enabled
@@ -883,6 +928,7 @@ func (d *Status) UpdateLazyConnection(enabled bool) {
// MarkSignalDisconnected sets SignalState to disconnected
func (d *Status) MarkSignalDisconnected(err error) {
d.profile.inc("MarkSignalDisconnected")
d.mux.Lock()
d.signalState = false
d.signalError = err
@@ -895,6 +941,7 @@ func (d *Status) MarkSignalDisconnected(err error) {
// MarkSignalConnected sets SignalState to connected
func (d *Status) MarkSignalConnected() {
d.profile.inc("MarkSignalConnected")
d.mux.Lock()
d.signalState = true
d.signalError = nil
@@ -906,18 +953,21 @@ func (d *Status) MarkSignalConnected() {
}
func (d *Status) UpdateRelayStates(relayResults []relay.ProbeResult) {
d.mux.Lock()
defer d.mux.Unlock()
d.profile.inc("UpdateRelayStates")
d.muxRelays.Lock()
defer d.muxRelays.Unlock()
d.relayStates = relayResults
}
func (d *Status) UpdateDNSStates(dnsStates []NSGroupState) {
d.profile.inc("UpdateDNSStates")
d.mux.Lock()
defer d.mux.Unlock()
d.nsGroupStates = dnsStates
}
func (d *Status) UpdateResolvedDomainsStates(originalDomain domain.Domain, resolvedDomain domain.Domain, prefixes []netip.Prefix, resourceId route.ResID) {
d.profile.inc("UpdateResolvedDomainsStates")
d.mux.Lock()
defer d.mux.Unlock()
@@ -933,6 +983,7 @@ func (d *Status) UpdateResolvedDomainsStates(originalDomain domain.Domain, resol
}
func (d *Status) DeleteResolvedDomainsStates(domain domain.Domain) {
d.profile.inc("DeleteResolvedDomainsStates")
d.mux.Lock()
defer d.mux.Unlock()
@@ -949,6 +1000,7 @@ func (d *Status) DeleteResolvedDomainsStates(domain domain.Domain) {
}
func (d *Status) GetRosenpassState() RosenpassState {
d.profile.inc("GetRosenpassState")
d.mux.RLock()
defer d.mux.RUnlock()
return RosenpassState{
@@ -958,12 +1010,14 @@ func (d *Status) GetRosenpassState() RosenpassState {
}
func (d *Status) GetLazyConnection() bool {
d.profile.inc("GetLazyConnection")
d.mux.RLock()
defer d.mux.RUnlock()
return d.lazyConnectionEnabled
}
func (d *Status) GetManagementState() ManagementState {
d.profile.inc("GetManagementState")
d.mux.RLock()
defer d.mux.RUnlock()
return ManagementState{
@@ -974,6 +1028,7 @@ func (d *Status) GetManagementState() ManagementState {
}
func (d *Status) UpdateLatency(pubKey string, latency time.Duration) error {
d.profile.inc("UpdateLatency")
if latency <= 0 {
return nil
}
@@ -991,6 +1046,7 @@ func (d *Status) UpdateLatency(pubKey string, latency time.Duration) error {
// IsLoginRequired determines if a peer's login has expired.
func (d *Status) IsLoginRequired() bool {
d.profile.inc("IsLoginRequired")
d.mux.RLock()
defer d.mux.RUnlock()
@@ -1007,6 +1063,7 @@ func (d *Status) IsLoginRequired() bool {
}
func (d *Status) GetSignalState() SignalState {
d.profile.inc("GetSignalState")
d.mux.RLock()
defer d.mux.RUnlock()
return SignalState{
@@ -1018,24 +1075,34 @@ func (d *Status) GetSignalState() SignalState {
// GetRelayStates returns the stun/turn/permanent relay states
func (d *Status) GetRelayStates() []relay.ProbeResult {
d.mux.RLock()
defer d.mux.RUnlock()
d.profile.inc("GetRelayStates")
d.muxRelays.RLock()
// debug lines
started := time.Now()
defer func() {
debugElapsed("GetRelayStates", started)
}()
if d.relayMgr == nil {
defer d.muxRelays.RUnlock()
return d.relayStates
}
relayMgr := d.relayMgr
// extend the list of stun, turn servers with the relay server connections
relayStates := slices.Clone(d.relayStates)
d.muxRelays.RUnlock()
states := d.relayMgr.RelayStates()
states := relayMgr.RelayStates()
if len(states) == 0 {
// no relay connection tracked yet; surface configured servers as
// unavailable with the real reconnect error when known
err := relayClient.ErrRelayClientNotConnected
if connErr := d.relayMgr.RelayConnectError(); connErr != nil {
if connErr := relayMgr.RelayConnectError(); connErr != nil {
err = connErr
}
for _, r := range d.relayMgr.ServerURLs() {
for _, r := range relayMgr.ServerURLs() {
relayStates = append(relayStates, relay.ProbeResult{
URI: r,
Err: err,
@@ -1055,7 +1122,15 @@ func (d *Status) GetRelayStates() []relay.ProbeResult {
}
func (d *Status) ForwardingRules() []firewall.ForwardRule {
d.profile.inc("ForwardingRules")
d.mux.RLock()
// debug lines
started := time.Now()
defer func() {
debugElapsed("ForwardingRules", started)
}()
defer d.mux.RUnlock()
if d.ingressGwMgr == nil {
return nil
@@ -1065,6 +1140,7 @@ func (d *Status) ForwardingRules() []firewall.ForwardRule {
}
func (d *Status) GetDNSStates() []NSGroupState {
d.profile.inc("GetDNSStates")
d.mux.RLock()
defer d.mux.RUnlock()
@@ -1073,6 +1149,7 @@ func (d *Status) GetDNSStates() []NSGroupState {
}
func (d *Status) GetResolvedDomainsStates() map[domain.Domain]ResolvedDomainInfo {
d.profile.inc("GetResolvedDomainsStates")
d.mux.RLock()
defer d.mux.RUnlock()
return maps.Clone(d.resolvedDomainsStates)
@@ -1080,6 +1157,7 @@ func (d *Status) GetResolvedDomainsStates() map[domain.Domain]ResolvedDomainInfo
// GetFullStatus gets full status
func (d *Status) GetFullStatus() FullStatus {
d.profile.inc("GetFullStatus")
fullStatus := FullStatus{
ManagementState: d.GetManagementState(),
SignalState: d.GetSignalState(),
@@ -1106,26 +1184,31 @@ func (d *Status) GetFullStatus() FullStatus {
// ClientStart will notify all listeners about the new service state
func (d *Status) ClientStart() {
d.profile.inc("ClientStart")
d.notifier.clientStart()
}
// ClientStop will notify all listeners about the new service state
func (d *Status) ClientStop() {
d.profile.inc("ClientStop")
d.notifier.clientStop()
}
// ClientTeardown will notify all listeners about the service is under teardown
func (d *Status) ClientTeardown() {
d.profile.inc("ClientTeardown")
d.notifier.clientTearDown()
}
// SetConnectionListener set a listener to the notifier
func (d *Status) SetConnectionListener(listener Listener) {
d.profile.inc("SetConnectionListener")
d.notifier.setListener(listener)
}
// RemoveConnectionListener remove the listener from the notifier
func (d *Status) RemoveConnectionListener() {
d.profile.inc("RemoveConnectionListener")
d.notifier.removeListener()
}
@@ -1197,6 +1280,7 @@ func (d *Status) PublishEvent(
userMsg string,
metadata map[string]string,
) {
d.profile.inc("PublishEvent")
event := &proto.SystemEvent{
Id: uuid.New().String(),
Severity: severity,
@@ -1208,6 +1292,13 @@ func (d *Status) PublishEvent(
}
d.eventMux.Lock()
// debug lines
started := time.Now()
defer func() {
debugElapsed("PublishEvent", started)
}()
defer d.eventMux.Unlock()
d.eventQueue.Add(event)
@@ -1225,6 +1316,7 @@ func (d *Status) PublishEvent(
// SubscribeToEvents returns a new event subscription
func (d *Status) SubscribeToEvents() *EventSubscription {
d.profile.inc("SubscribeToEvents")
d.eventMux.Lock()
defer d.eventMux.Unlock()
@@ -1240,6 +1332,7 @@ func (d *Status) SubscribeToEvents() *EventSubscription {
// UnsubscribeFromEvents removes an event subscription
func (d *Status) UnsubscribeFromEvents(sub *EventSubscription) {
d.profile.inc("UnsubscribeFromEvents")
if sub == nil {
return
}
@@ -1255,10 +1348,12 @@ func (d *Status) UnsubscribeFromEvents(sub *EventSubscription) {
// GetEventHistory returns all events in the queue
func (d *Status) GetEventHistory() []*proto.SystemEvent {
d.profile.inc("GetEventHistory")
return d.eventQueue.GetAll()
}
func (d *Status) SetWgIface(wgInterface WGIfaceStatus) {
d.profile.inc("SetWgIface")
d.mux.Lock()
defer d.mux.Unlock()
@@ -1266,7 +1361,15 @@ func (d *Status) SetWgIface(wgInterface WGIfaceStatus) {
}
func (d *Status) PeersStatus() (*configurer.Stats, error) {
d.profile.inc("PeersStatus")
d.mux.RLock()
// debug lines
started := time.Now()
defer func() {
debugElapsed("PeersStatus", started)
}()
defer d.mux.RUnlock()
if d.wgIface == nil {
return nil, fmt.Errorf("wgInterface is nil, cannot retrieve peers status")
@@ -1279,7 +1382,15 @@ func (d *Status) PeersStatus() (*configurer.Stats, error) {
// and updates the cached peer states. This ensures accurate handshake times and
// transfer statistics in status reports without running full health probes.
func (d *Status) RefreshWireGuardStats() error {
d.profile.inc("RefreshWireGuardStats")
d.mux.Lock()
// debug lines
started := time.Now()
defer func() {
debugElapsed("RefreshWireGuardStats", started)
}()
defer d.mux.Unlock()
if d.wgIface == nil {
@@ -1444,3 +1555,10 @@ func (fs FullStatus) ToProto() *proto.FullStatus {
return &pbFullStatus
}
func debugElapsed(msg string, startTime time.Time) {
if elapsed := time.Since(startTime); elapsed > 1*time.Second {
log.Infof("run %s took %s", msg, elapsed)
debug.PrintStack()
}
}

View File

@@ -0,0 +1,97 @@
package peer
import (
"context"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
log "github.com/sirupsen/logrus"
)
type StatusProfile struct {
counts sync.Map
}
func NewStatusProfile(ctx context.Context) *StatusProfile {
s := &StatusProfile{}
go s.Start(ctx)
return s
}
func (s *StatusProfile) Start(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.logCounts()
}
}
}
func (s *StatusProfile) inc(method string) {
if s == nil {
return
}
if v, ok := s.counts.Load(method); ok {
v.(*atomic.Int64).Add(1)
return
}
cnt := &atomic.Int64{}
actual, _ := s.counts.LoadOrStore(method, cnt)
actual.(*atomic.Int64).Add(1)
}
func (s *StatusProfile) snapshot() map[string]int64 {
out := make(map[string]int64)
s.counts.Range(func(k, v any) bool {
out[k.(string)] = v.(*atomic.Int64).Load()
return true
})
return out
}
func (s *StatusProfile) logCounts() {
counts := s.snapshot()
if len(counts) == 0 {
log.Infof("status profile: no Status method calls so far")
return
}
type kv struct {
method string
count int64
}
sorted := make([]kv, 0, len(counts))
var total int64
for m, c := range counts {
if c == 0 {
continue
}
sorted = append(sorted, kv{m, c})
total += c
}
sort.Slice(sorted, func(i, j int) bool {
if sorted[i].count != sorted[j].count {
return sorted[i].count > sorted[j].count
}
return sorted[i].method < sorted[j].method
})
var b strings.Builder
for i, e := range sorted {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(e.method)
b.WriteByte('=')
b.WriteString(strconv.FormatInt(e.count, 10))
}
log.Infof("status profile (cumulative total=%d): %s", total, b.String())
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"net/netip"
"strconv"
"sync"
"time"
@@ -164,6 +165,10 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA
return
}
if candidateViaRoutes(candidate, haRoutes) {
return
}
if err := w.agent.AddRemoteCandidate(candidate); err != nil {
w.log.Errorf("error while handling remote candidate")
return
@@ -461,7 +466,7 @@ func (w *WorkerICE) createForwardedCandidate(srflxCandidate ice.Candidate, mappi
}
func (w *WorkerICE) onICESelectedCandidatePair(agent *icemaker.ThreadSafeAgent, c1, c2 ice.Candidate) {
w.log.Debugf("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
w.log.Infof("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
w.config.Key)
pairStat, ok := agent.GetSelectedCandidatePairStats()
@@ -584,6 +589,34 @@ func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive
return ec, nil
}
func candidateViaRoutes(candidate ice.Candidate, clientRoutes route.HAMap) bool {
addr, err := netip.ParseAddr(candidate.Address())
if err != nil {
log.Errorf("Failed to parse IP address %s: %v", candidate.Address(), err)
return false
}
var routePrefixes []netip.Prefix
for _, routes := range clientRoutes {
if len(routes) > 0 && routes[0] != nil {
routePrefixes = append(routePrefixes, routes[0].Network)
}
}
for _, prefix := range routePrefixes {
// default route is handled by route exclusion / ip rules
if prefix.Bits() == 0 {
continue
}
if prefix.Contains(addr) {
log.Debugf("Ignoring candidate [%s], its address is part of routed network %s", candidate.String(), prefix)
return true
}
}
return false
}
func isRelayCandidate(candidate ice.Candidate) bool {
return candidate.Type() == ice.CandidateTypeRelay
}

View File

@@ -121,12 +121,9 @@ func (r *SysOps) addRouteToNonVPNIntf(prefix netip.Prefix, vpnIntf wgIface, init
return Nexthop{}, vars.ErrRouteNotAllowed
}
// BSDs blackhole a /32 added inside a directly-connected subnet; Linux/Windows need it to beat the wt0 route.
switch runtime.GOOS {
case "darwin", "freebsd", "netbsd", "openbsd", "dragonfly":
if isLocal, subnet := r.isPrefixInLocalSubnets(prefix); isLocal {
return Nexthop{}, fmt.Errorf("prefix %s is part of local subnet %s: %w", prefix, subnet, vars.ErrRouteNotAllowed)
}
// Check if the prefix is part of any local subnets
if isLocal, subnet := r.isPrefixInLocalSubnets(prefix); isLocal {
return Nexthop{}, fmt.Errorf("prefix %s is part of local subnet %s: %w", prefix, subnet, vars.ErrRouteNotAllowed)
}
// Determine the exit interface and next hop for the prefix, so we can add a specific route

View File

@@ -136,6 +136,7 @@ func New(ctx context.Context, logFile string, configFile string, profilesDisable
networksDisabled: networksDisabled,
jwtCache: newJWTCache(),
}
go s.statusRecorder.StartProfile(ctx)
agent := &serverAgent{s}
s.sleepHandler = sleephandler.New(agent)
s.startSleepDetector()

View File

@@ -243,7 +243,11 @@ func NewClientWithServerIP(serverURL string, serverIP netip.Addr, authTokenStore
// Connect establishes a connection to the relay server. It blocks until the connection is established or an error occurs.
func (c *Client) Connect(ctx context.Context) error {
c.log.Infof("connecting to relay server")
start := time.Now()
defer func() {
c.log.Infof("connect elapsed time: %v", time.Since(start))
}()
c.readLoopMutex.Lock()
defer c.readLoopMutex.Unlock()
@@ -287,6 +291,11 @@ func (c *Client) Connect(ctx context.Context) error {
func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, error) {
peerID := messages.HashID(dstPeerID)
start := time.Now()
defer func() {
c.log.Infof("connect elapsed time: %v", time.Since(start))
}()
c.mu.Lock()
if !c.serviceIsRunning {
c.mu.Unlock()

View File

@@ -8,6 +8,7 @@ import (
"testing"
"time"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"github.com/netbirdio/netbird/client/iface"
@@ -252,7 +253,7 @@ func TestClient_ConnectedIPParsesRemoteAddr(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{relayConn: stubConn{remote: staticAddr{s: tt.s}}}
c := &Client{log: log.WithField("relay", tt.name), relayConn: stubConn{remote: staticAddr{s: tt.s}}}
got := c.ConnectedIP()
var gotStr string
if got.IsValid() {

View File

@@ -78,6 +78,23 @@ type GrpcClient struct {
// transport-alive but no longer delivering messages. It is the source of
// truth IsHealthy reads, and is cleared once any frame is received again.
receiveStalled atomic.Bool
// receiveHandoffBlocked is set while the receive loop is parked handing a
// message to a busy decryption worker. The loop stops calling Recv (and
// markReceived) in that window, so the stream looks silent though it is
// healthy. The watchdog reads this to avoid misreading self-inflicted
// receive backpressure as a dead stream: reconnecting cannot help, since the
// new stream feeds the same worker, and only triggers a reconnect storm.
receiveHandoffBlocked atomic.Bool
// lastDecrypt holds the Unix-nano timestamp of the last message the decryption
// worker pulled off its queue. Diagnostic only: it lets a stall log show
// whether the worker was draining (busy) or idle when the stream went silent.
lastDecrypt atomic.Int64
// handoffWaitTotal, handoffWaitMax (nanos) and handoffWaitCount accumulate the
// time the receive loop spent blocked handing messages to the worker. This is
// time not spent reading the stream, so it quantifies receive backpressure.
handoffWaitTotal atomic.Int64
handoffWaitMax atomic.Int64
handoffWaitCount atomic.Int64
}
// NewClient creates a new Signal client
@@ -353,6 +370,8 @@ func (c *GrpcClient) SendToStream(msg *proto.EncryptedMessage) error {
// decryptMessage decrypts the body of the msg using Wireguard private key and Remote peer's public key
func (c *GrpcClient) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, error) {
c.lastDecrypt.Store(time.Now().UnixNano())
remoteKey, err := wgtypes.ParseKey(msg.GetKey())
if err != nil {
return nil, err
@@ -439,6 +458,22 @@ func (c *GrpcClient) idleSinceReceive() time.Duration {
return time.Since(time.Unix(0, c.lastReceived.Load()))
}
// idleSinceDecrypt returns how long since the worker last pulled a message.
// Diagnostic only: distinguishes a busy/wedged worker from an idle one.
func (c *GrpcClient) idleSinceDecrypt() time.Duration {
return time.Since(time.Unix(0, c.lastDecrypt.Load()))
}
// receiveAlive reports whether the receive stream shows liveness: it delivered a
// frame within the inactivity threshold, or the receive loop is currently parked
// handing a message to a busy decryption worker. In the latter case the loop has
// stopped calling Recv, so the stream looks silent while being healthy, and
// reconnecting would not help, so the watchdog must treat it as alive.
func (c *GrpcClient) receiveAlive() bool {
return c.idleSinceReceive() < receiveInactivityThreshold ||
c.receiveHandoffBlocked.Load()
}
// watchReceiveStream guards against a receive stream that is transport-alive but
// no longer delivering messages. While the stream is idle past
// receiveInactivityThreshold it sends a self-addressed probe that the Signal
@@ -450,18 +485,55 @@ func (c *GrpcClient) watchReceiveStream(ctx context.Context, cancelStream contex
defer ticker.Stop()
var probeSentAt time.Time
var holdLogged bool
var statTicks int
var lastStatTotal int64
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if c.idleSinceReceive() < receiveInactivityThreshold {
// Periodic backpressure summary so time lost to the worker handoff is
// visible even when no stall fires. Emitted ~once a minute and only
// when the wait grew, to stay quiet on a healthy stream.
if statTicks++; statTicks >= int(time.Minute/receiveWatchdogInterval) {
statTicks = 0
if total, max, count := c.handoffWaitStats(); int64(total) > lastStatTotal {
log.Infof("signal receive backpressure: handoffWaitTotal=%s (+%s last min) handoffWaitMax=%s handoffMsgs=%d",
total.Round(time.Second), (total - time.Duration(lastStatTotal)).Round(time.Millisecond),
max.Round(time.Millisecond), count)
lastStatTotal = int64(total)
}
}
if c.receiveAlive() {
// Attribute the case that matters in the field: silent past the
// threshold but held because the receive loop is parked on the
// worker handoff (backpressure), not a dead stream. Log once per
// hold episode so a persistent worker stall is visible at info.
if c.idleSinceReceive() >= receiveInactivityThreshold && c.receiveHandoffBlocked.Load() {
if !holdLogged {
total, max, count := c.handoffWaitStats()
log.Infof("signal receive idle %s, loop blocked on worker handoff (idleDecrypt=%s queueDepth=%d connState=%s handoffWaitTotal=%s handoffWaitMax=%s handoffMsgs=%d); holding stream",
c.idleSinceReceive().Round(time.Second), c.idleSinceDecrypt().Round(time.Second),
c.decryptionWorker.QueueLen(), c.signalConn.GetState(),
total.Round(time.Second), max.Round(time.Millisecond), count)
holdLogged = true
}
} else {
holdLogged = false
}
probeSentAt = time.Time{}
continue
}
holdLogged = false
if !probeSentAt.IsZero() && time.Since(probeSentAt) >= receiveProbeTimeout {
log.Warnf("signal receive stream stalled: no messages for %s and probe did not return, reconnecting", c.idleSinceReceive().Round(time.Second))
total, max, count := c.handoffWaitStats()
log.Warnf("signal receive stream stalled, reconnecting: idleRecv=%s idleDecrypt=%s handoffBlocked=%v queueDepth=%d connState=%s handoffWaitTotal=%s handoffWaitMax=%s handoffMsgs=%d probe did not return",
c.idleSinceReceive().Round(time.Second), c.idleSinceDecrypt().Round(time.Second),
c.receiveHandoffBlocked.Load(), c.decryptionWorker.QueueLen(), c.signalConn.GetState(),
total.Round(time.Second), max.Round(time.Millisecond), count)
c.receiveStalled.Store(true)
c.notifyDisconnected(errReceiveStreamStalled)
cancelStream()
@@ -517,12 +589,37 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) er
continue
}
// The handoff blocks while the worker is busy, which parks this loop and
// stops Recv. Flag it so the watchdog does not read the resulting silence
// as a dead stream, and account the wait as receive backpressure.
handoffStart := time.Now()
c.receiveHandoffBlocked.Store(true)
if err := c.decryptionWorker.AddMsg(c.ctx, msg); err != nil {
log.Errorf("failed to add message to decryption worker: %v", err)
}
c.receiveHandoffBlocked.Store(false)
c.recordHandoffWait(time.Since(handoffStart))
}
}
// recordHandoffWait accumulates the time the receive loop was blocked handing a
// message to the worker.
func (c *GrpcClient) recordHandoffWait(d time.Duration) {
c.handoffWaitTotal.Add(int64(d))
c.handoffWaitCount.Add(1)
for {
cur := c.handoffWaitMax.Load()
if int64(d) <= cur || c.handoffWaitMax.CompareAndSwap(cur, int64(d)) {
break
}
}
}
// handoffWaitStats returns cumulative receive-loop handoff backpressure.
func (c *GrpcClient) handoffWaitStats() (total, max time.Duration, count int64) {
return time.Duration(c.handoffWaitTotal.Load()), time.Duration(c.handoffWaitMax.Load()), c.handoffWaitCount.Load()
}
func (c *GrpcClient) startEncryptionWorker(handler func(msg *proto.Message) error) {
if c.decryptionWorker != nil {
return

View File

@@ -82,3 +82,27 @@ func TestReceiveProbeRoundTrips(t *testing.T) {
t.Fatal("self-addressed heartbeat did not round-trip back through the signal server")
}
}
// TestReceiveAliveTreatsHandoffBlockAsLiveness reproduces the false positive
// where a busy decryption worker parks the receive loop on the worker handoff,
// so Recv (and markReceived) stops firing even though the stream is healthy.
// With the receive stream silent past the inactivity threshold but the loop
// blocked on handoff, the watchdog must consider the stream alive rather than
// tear it down (reconnecting feeds the same worker and would not help).
func TestReceiveAliveTreatsHandoffBlockAsLiveness(t *testing.T) {
c := &GrpcClient{}
// Receive stream silent and the loop not blocked on handoff: genuinely stalled.
c.lastReceived.Store(time.Now().Add(-2 * receiveInactivityThreshold).UnixNano())
require.False(t, c.receiveAlive(), "silent stream with the receive loop idle must be treated as stalled")
// Receive stream silent but the loop is parked handing a message to a busy
// worker: self-inflicted backpressure, not a dead stream, must not tear down.
c.receiveHandoffBlocked.Store(true)
require.True(t, c.receiveAlive(), "a receive loop blocked on worker handoff must keep the stream alive")
// Handoff drained, loop back to reading, a frame just arrived: alive via the receive path.
c.receiveHandoffBlocked.Store(false)
c.markReceived()
require.True(t, c.receiveAlive(), "a freshly received frame must keep the stream alive")
}

View File

@@ -32,6 +32,13 @@ func (w *Worker) AddMsg(ctx context.Context, msg *proto.EncryptedMessage) error
return nil
}
// QueueLen returns the number of messages buffered for decryption. Diagnostic
// only: a non-empty queue while the receive stream is silent indicates the
// receive loop is parked on the handoff rather than the stream being dead.
func (w *Worker) QueueLen() int {
return len(w.encryptedMsgPool)
}
func (w *Worker) Work(ctx context.Context) {
for {
select {