Compare commits

..

2 Commits

Author SHA1 Message Date
jnfrati
5bb3ab60a8 Merge branch 'main' of github.com:netbirdio/netbird into client-json-socket 2026-06-24 14:17:07 +02:00
jnfrati
a2fd1bb0a8 add json gateway for netbird daemon 2026-05-27 19:04:55 +02:00
32 changed files with 2783 additions and 598 deletions

View File

@@ -5,6 +5,7 @@ package cmd
import (
"context"
"fmt"
"net/http"
"runtime"
"strings"
"sync"
@@ -22,15 +23,21 @@ var serviceCmd = &cobra.Command{
Short: "Manage the NetBird daemon service",
}
const defaultJSONSocket = "unix:///var/run/netbird-http.sock"
var (
serviceName string
serviceEnvVars []string
serviceName string
serviceEnvVars []string
jsonSocket string
jsonSocketDisabled bool
)
type program struct {
ctx context.Context
cancel context.CancelFunc
serv *grpc.Server
jsonServ *http.Server
jsonServMu sync.Mutex
serverInstance *server.Server
serverInstanceMu sync.Mutex
}
@@ -46,6 +53,8 @@ func init() {
serviceCmd.PersistentFlags().BoolVar(&updateSettingsDisabled, "disable-update-settings", false, "Disables update settings feature. If enabled, the client will not be able to change or edit any settings. To persist this setting, use: netbird service install --disable-update-settings")
serviceCmd.PersistentFlags().BoolVar(&captureEnabled, "enable-capture", false, "Enables packet capture via 'netbird debug capture'. To persist, use: netbird service install --enable-capture")
serviceCmd.PersistentFlags().BoolVar(&networksDisabled, "disable-networks", false, "Disables network selection. If enabled, the client will not allow listing, selecting, or deselecting networks. To persist, use: netbird service install --disable-networks")
serviceCmd.PersistentFlags().StringVar(&jsonSocket, "json-socket", defaultJSONSocket, "HTTP/JSON API socket address served by grpc-gateway [unix|tcp]://[path|host:port]. To persist, use: netbird service install --json-socket")
serviceCmd.PersistentFlags().BoolVar(&jsonSocketDisabled, "disable-json-socket", false, "Disables the HTTP/JSON API socket. To persist, use: netbird service install --disable-json-socket")
rootCmd.PersistentFlags().StringVarP(&serviceName, "service", "s", defaultServiceName, "Netbird system service name")
serviceEnvDesc := `Sets extra environment variables for the service. ` +

View File

@@ -5,9 +5,6 @@ package cmd
import (
"context"
"fmt"
"net"
"os"
"strings"
"time"
"github.com/kardianos/service"
@@ -32,31 +29,35 @@ func (p *program) Start(svc service.Service) error {
// in any case, even if configuration does not exists we run daemon to serve CLI gRPC API.
p.serv = grpc.NewServer()
split := strings.Split(daemonAddr, "://")
switch split[0] {
case "unix":
// cleanup failed close
stat, err := os.Stat(split[1])
if err == nil && !stat.IsDir() {
if err := os.Remove(split[1]); err != nil {
log.Debugf("remove socket file: %v", err)
}
}
case "tcp":
default:
return fmt.Errorf("unsupported daemon address protocol: %v", split[0])
}
listen, err := net.Listen(split[0], split[1])
daemonListener, err := listenOnAddress(daemonAddr)
if err != nil {
return fmt.Errorf("listen daemon interface: %w", err)
}
go func() {
defer listen.Close()
if split[0] == "unix" {
if err := os.Chmod(split[1], 0666); err != nil {
log.Errorf("failed setting daemon permissions: %v", split[1])
var jsonListener *socketListener
if !jsonSocketDisabled {
jsonListener, err = listenOnAddress(jsonSocket)
if err != nil {
_ = daemonListener.Close()
return fmt.Errorf("listen daemon JSON interface: %w", err)
}
} else {
removeStaleUnixSocketForAddress(jsonSocket)
}
go func() {
defer daemonListener.Close()
if jsonListener != nil {
defer jsonListener.Close()
}
if err := daemonListener.chmodUnixSocket("daemon"); err != nil {
log.Error(err)
return
}
if jsonListener != nil {
if err := jsonListener.chmodUnixSocket("daemon JSON"); err != nil {
log.Error(err)
return
}
}
@@ -71,8 +72,16 @@ func (p *program) Start(svc service.Service) error {
p.serverInstance = serverInstance
p.serverInstanceMu.Unlock()
log.Printf("started daemon server: %v", split[1])
if err := p.serv.Serve(listen); err != nil {
if jsonListener != nil {
if err := p.startJSONGateway(jsonListener, daemonAddr); err != nil {
log.Fatalf("failed to start daemon JSON server: %v", err)
}
} else {
log.Debug("daemon JSON socket disabled")
}
log.Printf("started daemon server: %v", daemonListener.address)
if err := p.serv.Serve(daemonListener.Listener); err != nil {
log.Errorf("failed to serve daemon requests: %v", err)
}
}()
@@ -92,6 +101,20 @@ func (p *program) Stop(srv service.Service) error {
p.cancel()
p.jsonServMu.Lock()
jsonServ := p.jsonServ
p.jsonServMu.Unlock()
if jsonServ != nil {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 2*time.Second)
if err := jsonServ.Shutdown(shutdownCtx); err != nil {
log.Errorf("failed to stop daemon JSON server gracefully: %v", err)
if err := jsonServ.Close(); err != nil {
log.Errorf("failed to close daemon JSON server: %v", err)
}
}
shutdownCancel()
}
if p.serv != nil {
p.serv.Stop()
}

View File

@@ -67,6 +67,11 @@ func buildServiceArguments() []string {
args = append(args, "--disable-networks")
}
args = append(args, "--json-socket", jsonSocket)
if jsonSocketDisabled {
args = append(args, "--disable-json-socket")
}
return args
}

View File

@@ -0,0 +1,52 @@
//go:build !ios && !android
package cmd
import (
"context"
"errors"
"net"
"net/http"
"strings"
"time"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/netbirdio/netbird/client/proto"
)
func grpcGatewayEndpoint(addr string) string {
return strings.TrimPrefix(addr, "tcp://")
}
func (p *program) startJSONGateway(jsonListener *socketListener, daemonEndpoint string) error {
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
if err := proto.RegisterDaemonServiceHandlerFromEndpoint(p.ctx, mux, grpcGatewayEndpoint(daemonEndpoint), opts); err != nil {
return err
}
jsonServer := &http.Server{
Handler: mux,
ReadHeaderTimeout: 5 * time.Second,
BaseContext: func(net.Listener) context.Context {
return p.ctx
},
}
p.jsonServMu.Lock()
p.jsonServ = jsonServer
p.jsonServMu.Unlock()
go func() {
log.Printf("started daemon JSON server: %v", jsonListener.address)
if err := jsonServer.Serve(jsonListener.Listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Errorf("failed to serve daemon JSON requests: %v", err)
}
}()
return nil
}

View File

@@ -23,6 +23,7 @@ const serviceParamsFile = "service.json"
type serviceParams struct {
LogLevel string `json:"log_level"`
DaemonAddr string `json:"daemon_addr"`
JSONSocket string `json:"json_socket"`
ManagementURL string `json:"management_url,omitempty"`
ConfigPath string `json:"config_path,omitempty"`
LogFiles []string `json:"log_files,omitempty"`
@@ -30,6 +31,7 @@ type serviceParams struct {
DisableUpdateSettings bool `json:"disable_update_settings,omitempty"`
EnableCapture bool `json:"enable_capture,omitempty"`
DisableNetworks bool `json:"disable_networks,omitempty"`
DisableJSONSocket bool `json:"disable_json_socket,omitempty"`
ServiceEnvVars map[string]string `json:"service_env_vars,omitempty"`
}
@@ -75,6 +77,7 @@ func currentServiceParams() *serviceParams {
params := &serviceParams{
LogLevel: logLevel,
DaemonAddr: daemonAddr,
JSONSocket: jsonSocket,
ManagementURL: managementURL,
ConfigPath: configPath,
LogFiles: logFiles,
@@ -82,6 +85,7 @@ func currentServiceParams() *serviceParams {
DisableUpdateSettings: updateSettingsDisabled,
EnableCapture: captureEnabled,
DisableNetworks: networksDisabled,
DisableJSONSocket: jsonSocketDisabled,
}
if len(serviceEnvVars) > 0 {
@@ -113,9 +117,8 @@ func applyServiceParams(cmd *cobra.Command, params *serviceParams) {
return
}
// For fields with non-empty defaults (log-level, daemon-addr), keep the
// != "" guard so that an older service.json missing the field doesn't
// clobber the default with an empty string.
// For fields with non-empty defaults, keep the != "" guard so that an older
// service.json missing the field doesn't clobber the default with an empty string.
if !rootCmd.PersistentFlags().Changed("log-level") && params.LogLevel != "" {
logLevel = params.LogLevel
}
@@ -124,6 +127,20 @@ func applyServiceParams(cmd *cobra.Command, params *serviceParams) {
daemonAddr = params.DaemonAddr
}
jsonSocketChanged := serviceCmd.PersistentFlags().Changed("json-socket")
if !jsonSocketChanged && params.JSONSocket != "" {
jsonSocket = params.JSONSocket
}
if !serviceCmd.PersistentFlags().Changed("disable-json-socket") {
jsonSocketDisabled = params.DisableJSONSocket
// Passing --json-socket should re-enable the JSON gateway unless
// --disable-json-socket was explicitly provided too.
if jsonSocketChanged {
jsonSocketDisabled = false
}
}
// For optional fields where empty means "use default", always apply so
// that an explicit clear (--management-url "") persists across reinstalls.
if !rootCmd.PersistentFlags().Changed("management-url") {

View File

@@ -530,6 +530,7 @@ func fieldToGlobalVar(field string) string {
m := map[string]string{
"LogLevel": "logLevel",
"DaemonAddr": "daemonAddr",
"JSONSocket": "jsonSocket",
"ManagementURL": "managementURL",
"ConfigPath": "configPath",
"LogFiles": "logFiles",
@@ -537,6 +538,7 @@ func fieldToGlobalVar(field string) string {
"DisableUpdateSettings": "updateSettingsDisabled",
"EnableCapture": "captureEnabled",
"DisableNetworks": "networksDisabled",
"DisableJSONSocket": "jsonSocketDisabled",
"ServiceEnvVars": "serviceEnvVars",
}
if v, ok := m[field]; ok {

View File

@@ -0,0 +1,83 @@
//go:build !ios && !android
package cmd
import (
"fmt"
"net"
"os"
"strings"
log "github.com/sirupsen/logrus"
)
type socketListener struct {
net.Listener
network string
address string
}
func listenOnAddress(addr string) (*socketListener, error) {
network, address, err := parseListenAddress(addr)
if err != nil {
return nil, err
}
if network == "unix" {
removeStaleUnixSocket(address)
}
listener, err := net.Listen(network, address)
if err != nil {
return nil, err
}
return &socketListener{Listener: listener, network: network, address: address}, nil
}
func parseListenAddress(addr string) (string, string, error) {
network, address, ok := strings.Cut(addr, "://")
if !ok || network == "" || address == "" {
return "", "", fmt.Errorf("address must be in [unix|tcp]://[path|host:port] format: %q", addr)
}
switch network {
case "unix", "tcp":
return network, address, nil
default:
return "", "", fmt.Errorf("unsupported daemon address protocol: %v", network)
}
}
func removeStaleUnixSocket(path string) {
stat, err := os.Stat(path)
if err == nil && !stat.IsDir() {
if err := os.Remove(path); err != nil {
log.Debugf("remove socket file: %v", err)
}
return
}
if err != nil && !os.IsNotExist(err) {
log.Debugf("stat socket file: %v", err)
}
}
func removeStaleUnixSocketForAddress(addr string) {
network, address, err := parseListenAddress(addr)
if err != nil || network != "unix" {
return
}
removeStaleUnixSocket(address)
}
func (l *socketListener) chmodUnixSocket(description string) error {
if l == nil || l.network != "unix" {
return nil
}
if err := os.Chmod(l.address, 0666); err != nil {
return fmt.Errorf("failed setting %s permissions for %s: %w", description, l.address, err)
}
return nil
}

View File

@@ -41,7 +41,6 @@ type ICEBind struct {
*wgConn.StdNetBind
transportNet transport.Net
filterFn udpmux.FilterFn
address wgaddr.Address
mtu uint16
@@ -61,12 +60,11 @@ type ICEBind struct {
ipv6Conn *net.UDPConn
}
func NewICEBind(transportNet transport.Net, filterFn udpmux.FilterFn, address wgaddr.Address, mtu uint16) *ICEBind {
func NewICEBind(transportNet transport.Net, address wgaddr.Address, mtu uint16) *ICEBind {
b, _ := wgConn.NewStdNetBind().(*wgConn.StdNetBind)
ib := &ICEBind{
StdNetBind: b,
transportNet: transportNet,
filterFn: filterFn,
address: address,
mtu: mtu,
endpoints: make(map[netip.Addr]net.Conn),
@@ -265,7 +263,6 @@ func (s *ICEBind) createOrUpdateMux() {
udpmux.UniversalUDPMuxParams{
UDPConn: muxConn,
Net: s.transportNet,
FilterFn: s.filterFn,
WGAddress: s.address,
MTU: s.mtu,
},

View File

@@ -289,7 +289,7 @@ func setupICEBind(t *testing.T) *ICEBind {
IP: netip.MustParseAddr("100.64.0.1"),
Network: netip.MustParsePrefix("100.64.0.0/10"),
}
return NewICEBind(transportNet, nil, address, 1280)
return NewICEBind(transportNet, address, 1280)
}
func createDualStackConns(t *testing.T) (*net.UDPConn, *net.UDPConn) {

View File

@@ -32,8 +32,6 @@ type TunKernelDevice struct {
link *wgLink
udpMuxConn net.PacketConn
udpMux *udpmux.UniversalUDPMuxDefault
filterFn udpmux.FilterFn
}
func NewKernelDevice(name string, address wgaddr.Address, wgPort int, key string, mtu uint16, transportNet transport.Net) *TunKernelDevice {
@@ -104,7 +102,6 @@ func (t *TunKernelDevice) Up() (*udpmux.UniversalUDPMuxDefault, error) {
bindParams := udpmux.UniversalUDPMuxParams{
UDPConn: nbnet.WrapPacketConn(rawSock),
Net: t.transportNet,
FilterFn: t.filterFn,
WGAddress: t.address,
MTU: t.mtu,
}

View File

@@ -63,7 +63,6 @@ type WGIFaceOpts struct {
MTU uint16
MobileArgs *device.MobileIFaceArguments
TransportNet transport.Net
FilterFn udpmux.FilterFn
DisableDNS bool
}

View File

@@ -11,7 +11,7 @@ import (
// NewWGIFace Creates a new WireGuard interface instance
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
var tun WGTunDevice
if netstack.IsEnabled() {

View File

@@ -9,7 +9,7 @@ import (
// NewWGIFace Creates a new WireGuard interface instance
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
if netstack.IsEnabled() {
wgIFace := &WGIface{

View File

@@ -10,7 +10,7 @@ import (
// NewWGIFace Creates a new WireGuard interface instance
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
wgIFace := &WGIface{
tun: device.NewTunDevice(opts.IFaceName, opts.Address, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind, opts.MobileArgs.TunFd),

View File

@@ -14,7 +14,7 @@ import (
// NewWGIFace Creates a new WireGuard interface instance
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
if netstack.IsEnabled() {
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
return &WGIface{
tun: device.NewNetstackDevice(opts.IFaceName, opts.Address, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind, netstack.ListenAddr()),
userspaceBind: true,
@@ -30,7 +30,7 @@ func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
}
if device.ModuleTunIsLoaded() {
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
return &WGIface{
tun: device.NewTunDevice(opts.IFaceName, opts.Address, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind),
userspaceBind: true,

View File

@@ -8,8 +8,6 @@ import (
"context"
"fmt"
"net"
"net/netip"
"sync"
"time"
log "github.com/sirupsen/logrus"
@@ -22,10 +20,6 @@ import (
"github.com/netbirdio/netbird/client/iface/wgaddr"
)
// FilterFn is a function that filters out candidates based on the address.
// If it returns true, the address is to be filtered. It also returns the prefix of matching route.
type FilterFn func(address netip.Addr) (bool, netip.Prefix, error)
// UniversalUDPMuxDefault handles STUN and TURN servers packets by wrapping the original UDPConn
// It then passes packets to the UDPMux that does the actual connection muxing.
type UniversalUDPMuxDefault struct {
@@ -43,7 +37,6 @@ type UniversalUDPMuxParams struct {
UDPConn net.PacketConn
XORMappedAddrCacheTTL time.Duration
Net transport.Net
FilterFn FilterFn
WGAddress wgaddr.Address
MTU uint16
}
@@ -68,7 +61,6 @@ func NewUniversalUDPMuxDefault(params UniversalUDPMuxParams) *UniversalUDPMuxDef
PacketConn: params.UDPConn,
mux: m,
logger: params.Logger,
filterFn: params.FilterFn,
address: params.WGAddress,
}
@@ -115,15 +107,12 @@ func (m *UniversalUDPMuxDefault) ReadFromConn(ctx context.Context) {
}
}
// UDPConn is a wrapper around UDPMux conn that overrides ReadFrom and handles STUN/TURN packets
// UDPConn is a wrapper around UDPMux conn that overrides WriteTo to drop packets destined for the overlay subnet.
type UDPConn struct {
net.PacketConn
mux *UniversalUDPMuxDefault
logger logging.LeveledLogger
filterFn FilterFn
// TODO: reset cache on route changes
addrCache sync.Map
address wgaddr.Address
mux *UniversalUDPMuxDefault
logger logging.LeveledLogger
address wgaddr.Address
}
// GetPacketConn returns the underlying PacketConn
@@ -132,65 +121,16 @@ func (u *UDPConn) GetPacketConn() net.PacketConn {
}
func (u *UDPConn) WriteTo(b []byte, addr net.Addr) (int, error) {
if u.filterFn == nil {
udpAddr, ok := addr.(*net.UDPAddr)
if !ok {
return u.PacketConn.WriteTo(b, addr)
}
if isRouted, found := u.addrCache.Load(addr.String()); found {
return u.handleCachedAddress(isRouted.(bool), b, addr)
}
return u.handleUncachedAddress(b, addr)
}
func (u *UDPConn) handleCachedAddress(isRouted bool, b []byte, addr net.Addr) (int, error) {
if isRouted {
return 0, fmt.Errorf("address %s is part of a routed network, refusing to write", addr)
}
return u.PacketConn.WriteTo(b, addr)
}
func (u *UDPConn) handleUncachedAddress(b []byte, addr net.Addr) (int, error) {
if err := u.performFilterCheck(addr); err != nil {
return 0, err
}
return u.PacketConn.WriteTo(b, addr)
}
func (u *UDPConn) performFilterCheck(addr net.Addr) error {
host, err := getHostFromAddr(addr)
if err != nil {
log.Errorf("Failed to get host from address %s: %v", addr, err)
return nil
}
a, err := netip.ParseAddr(host)
if err != nil {
log.Errorf("Failed to parse address %s: %v", addr, err)
return nil
}
if u.address.Network.Contains(a) {
dst := udpAddr.AddrPort().Addr().Unmap()
if (u.address.Network.IsValid() && u.address.Network.Contains(dst)) || (u.address.IPv6Net.IsValid() && u.address.IPv6Net.Contains(dst)) {
log.Warnf("address %s is part of the NetBird network %s, refusing to write", addr, u.address)
return fmt.Errorf("address %s is part of the NetBird network %s, refusing to write", addr, u.address)
return 0, fmt.Errorf("address %s is part of the NetBird network %s, refusing to write", addr, u.address)
}
if isRouted, prefix, err := u.filterFn(a); err != nil {
log.Errorf("Failed to check if address %s is routed: %v", addr, err)
} else {
u.addrCache.Store(addr.String(), isRouted)
if isRouted {
// Extra log, as the error only shows up with ICE logging enabled
log.Infof("address %s is part of routed network %s, refusing to write", addr, prefix)
return fmt.Errorf("address %s is part of routed network %s, refusing to write", addr, prefix)
}
}
return nil
}
func getHostFromAddr(addr net.Addr) (string, error) {
host, _, err := net.SplitHostPort(addr.String())
return host, err
return u.PacketConn.WriteTo(b, addr)
}
// GetSharedConn returns the shared udp conn
@@ -225,6 +165,13 @@ func (m *UniversalUDPMuxDefault) HandleSTUNMessage(msg *stun.Message, addr net.A
return nil
}
src := udpAddr.AddrPort().Addr().Unmap()
wg := m.params.WGAddress
if (wg.Network.IsValid() && wg.Network.Contains(src)) || (wg.IPv6Net.IsValid() && wg.IPv6Net.Contains(src)) {
log.Debugf("dropping STUN message from overlay source %s", udpAddr)
return nil
}
if m.isXORMappedResponse(msg, udpAddr.String()) {
err := m.handleXORMappedResponse(udpAddr, msg)
if err != nil {

View File

@@ -66,7 +66,7 @@ func seedProxyForProxyCloseByRemoteConn() ([]proxyInstance, error) {
if err != nil {
return nil, err
}
iceBind := bind.NewICEBind(nil, nil, wgAddress, 1280)
iceBind := bind.NewICEBind(nil, wgAddress, 1280)
endpointAddress := &net.UDPAddr{
IP: net.IPv4(10, 0, 0, 1),
Port: 1234,

View File

@@ -22,7 +22,7 @@ func seedProxyForProxyCloseByRemoteConn() ([]proxyInstance, error) {
if err != nil {
return nil, err
}
iceBind := bind.NewICEBind(nil, nil, wgAddress, 1280)
iceBind := bind.NewICEBind(nil, wgAddress, 1280)
endpointAddress := &net.UDPAddr{
IP: net.IPv4(10, 0, 0, 1),
Port: 1234,

View File

@@ -14,7 +14,6 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-multierror"
@@ -54,7 +53,6 @@ import (
"github.com/netbirdio/netbird/client/internal/relay"
"github.com/netbirdio/netbird/client/internal/rosenpass"
"github.com/netbirdio/netbird/client/internal/routemanager"
"github.com/netbirdio/netbird/client/internal/routemanager/systemops"
"github.com/netbirdio/netbird/client/internal/statemanager"
"github.com/netbirdio/netbird/client/internal/syncstore"
"github.com/netbirdio/netbird/client/internal/updater"
@@ -90,13 +88,6 @@ var ErrResetConnection = fmt.Errorf("reset connection")
var ErrEngineAlreadyStarted = errors.New("engine already started")
// engineRestartCount and engineLastRestart track client-restart cadence across
// engine recreations so a restart loop is distinguishable from rare restarts.
var (
engineRestartCount atomic.Int64
engineLastRestart atomic.Int64
)
type EngineConfig struct {
WgPort int
WgIfaceName string
@@ -908,11 +899,7 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
started := time.Now()
defer func() {
duration := time.Since(started)
if update.GetNetworkMap() != nil {
log.Infof("sync finished in %s, %d", duration, update.GetNetworkMap().GetSerial())
} else {
log.Infof("sync finished in %s", duration)
}
log.Infof("sync finished in %s", duration)
e.clientMetrics.RecordSyncDuration(e.ctx, duration)
}()
e.syncMsgMux.Lock()
@@ -922,23 +909,14 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
if e.ctx.Err() != nil {
return e.ctx.Err()
}
serial := update.GetNetworkMap().GetSerial()
if nm := update.GetNetworkMap(); nm != nil {
log.Infof("sync update: serial=%d remotePeers=%d offlinePeers=%d routes=%d firewallRules=%d checks=%d configPresent=%v remotePeersEmpty=%v",
nm.GetSerial(), len(nm.GetRemotePeers()), len(nm.GetOfflinePeers()), len(nm.GetRoutes()),
len(nm.GetFirewallRules()), len(update.GetChecks()), update.GetNetbirdConfig() != nil, nm.GetRemotePeersIsEmpty())
} else {
log.Infof("sync update: config-only (no network map), configPresent=%v", update.GetNetbirdConfig() != nil)
}
if update.NetworkMap != nil && update.NetworkMap.PeerConfig != nil {
e.handleAutoUpdateVersion(update.NetworkMap.PeerConfig.AutoUpdate)
}
startTime := time.Now()
if err := e.updateNetbirdConfig(update.GetNetbirdConfig()); err != nil {
return err
}
log.Infof("netbird config updated in %s, serial=%d", time.Since(startTime), serial)
// Posture checks are bound to the network map presence:
// NetworkMap != nil, checks present -> apply the received checks
@@ -949,21 +927,17 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
if nm == nil {
return nil
}
startTime = time.Now()
if err := e.updateChecksIfNew(update.Checks); err != nil {
return err
}
log.Infof("checks updated in %s, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
e.persistSyncResponse(update)
log.Infof("sync response persisted in %s, serial=%d", time.Since(startTime), serial)
// only apply new changes and ignore old ones
startTime = time.Now()
if err := e.updateNetworkMap(nm); err != nil {
return err
}
log.Infof("network map updated in %s, serial=%d", time.Since(startTime), serial)
e.statusRecorder.PublishEvent(cProto.SystemEvent_INFO, cProto.SystemEvent_SYSTEM, "Network map updated", "", nil)
@@ -1383,56 +1357,44 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
dnsConfig := toDNSConfig(protoDNSConfig, e.wgInterface.Address())
startTime := time.Now()
if err := e.dnsServer.UpdateDNSServer(serial, dnsConfig); err != nil {
log.Errorf("failed to update dns server, err: %v", err)
}
log.Infof("updated dns server in %v, serial=%d", time.Since(startTime), serial)
e.routeManager.SetDNSForwarderPort(dnsConfig.ForwarderPort)
// apply routes first, route related actions might depend on routing being enabled
startTime = time.Now()
routes := toRoutes(networkMap.GetRoutes())
serverRoutes, clientRoutes := e.routeManager.ClassifyRoutes(routes)
log.Infof("updated routes in %v, serial=%d", time.Since(startTime), serial)
// lazy mgr needs to be aware of which routes are available before they are applied
if e.connMgr != nil {
e.connMgr.UpdateRouteHAMap(clientRoutes)
log.Debugf("updated lazy connection manager with %d HA groups", len(clientRoutes))
}
startTime = time.Now()
dnsRouteFeatureFlag := toDNSFeatureFlag(networkMap)
if err := e.routeManager.UpdateRoutes(serial, serverRoutes, clientRoutes, dnsRouteFeatureFlag); err != nil {
log.Errorf("failed to update routes: %v", err)
}
log.Infof("updated routes in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
if e.acl != nil {
e.acl.ApplyFiltering(networkMap, dnsRouteFeatureFlag)
}
log.Infof("updated filtering in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
fwdEntries := toRouteDomains(e.config.WgPrivateKey.PublicKey().String(), routes)
e.updateDNSForwarder(dnsRouteFeatureFlag, fwdEntries)
log.Infof("updated DNS forwarder in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
// Ingress forward rules
forwardingRules, err := e.updateForwardRules(networkMap.GetForwardingRules())
if err != nil {
log.Errorf("failed to update forward rules, err: %v", err)
}
log.Infof("updated forward rules in %v, serial=%d", time.Since(startTime), serial)
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))
startTime = time.Now()
e.updateOfflinePeers(networkMap.GetOfflinePeers())
log.Infof("updated offline peers in %v, serial=%d", time.Since(startTime), serial)
// Filter out own peer from the remote peers list
localPubKey := e.config.WgPrivateKey.PublicKey().String()
remotePeers := make([]*mgmProto.RemotePeerConfig, 0, len(networkMap.GetRemotePeers()))
@@ -1450,24 +1412,20 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
return err
}
} else {
startTime = time.Now()
err := e.removePeers(remotePeers)
if err != nil {
return err
}
log.Infof("removed peers in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
err = e.modifyPeers(remotePeers)
if err != nil {
return err
}
log.Infof("modified peers in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
err = e.addNewPeers(remotePeers)
if err != nil {
return err
}
log.Infof("added peers in %v, serial=%d", time.Since(startTime), serial)
e.statusRecorder.FinishPeerListModifications()
@@ -1480,11 +1438,9 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
e.updateSSHServerAuth(networkMap.GetSshAuth())
}
startTime = time.Now()
// must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store
excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, remotePeers)
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
log.Infof("updated lazy connection manager exclude list in %v, serial=%d", time.Since(startTime), serial)
e.networkSerial = serial
@@ -1999,7 +1955,6 @@ func (e *Engine) newWgIface() (*iface.WGIface, error) {
WGPrivKey: e.config.WgPrivateKey.String(),
MTU: e.config.MTU,
TransportNet: transportNet,
FilterFn: e.addrViaRoutes,
DisableDNS: e.config.DisableDNS,
}
@@ -2216,14 +2171,7 @@ func (e *Engine) triggerClientRestart() {
return
}
// Cadence survives engine recreation (package-level), so a restart loop shows
// as a fast-climbing count with a short gap, distinct from rare intentional restarts.
n := engineRestartCount.Add(1)
var sinceLast time.Duration
if prev := engineLastRestart.Swap(time.Now().UnixNano()); prev != 0 {
sinceLast = time.Since(time.Unix(0, prev))
}
log.Infof("restarting engine (restart #%d, %s since previous)", n, sinceLast.Round(time.Second))
log.Info("restarting engine")
CtxGetState(e.ctx).Set(StatusConnecting)
_ = CtxGetState(e.ctx).Wrap(ErrResetConnection)
log.Infof("cancelling client context, engine will be recreated")
@@ -2254,21 +2202,6 @@ func (e *Engine) startNetworkMonitor() {
}()
}
func (e *Engine) addrViaRoutes(addr netip.Addr) (bool, netip.Prefix, error) {
var vpnRoutes []netip.Prefix
for _, routes := range e.routeManager.GetClientRoutes() {
if len(routes) > 0 && routes[0] != nil {
vpnRoutes = append(vpnRoutes, routes[0].Network)
}
}
if isVpn, prefix := systemops.IsAddrRouted(addr, vpnRoutes); isVpn {
return true, prefix, nil
}
return false, netip.Prefix{}, nil
}
func (e *Engine) stopDNSServer() {
if e.dnsServer == nil {
return

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"net/netip"
"runtime/debug"
"slices"
"sync"
"time"
@@ -193,7 +192,6 @@ func (s *StatusChangeSubscription) Events() chan map[string]RouterState {
// Pure read methods take RLock; anything that mutates state takes Lock.
type Status struct {
mux sync.RWMutex
muxRelays sync.RWMutex
peers map[string]State
ipToKey map[string]string
changeNotify map[string]map[string]*StatusChangeSubscription // map[peerID]map[subscriptionID]*StatusChangeSubscription
@@ -228,8 +226,6 @@ type Status struct {
routeIDLookup routeIDLookup
wgIface WGIfaceStatus
profile *StatusProfile
}
// NewRecorder returns a new Status instance
@@ -244,23 +240,16 @@ func NewRecorder(mgmAddress string) *Status {
notifier: newNotifier(),
mgmAddress: mgmAddress,
resolvedDomainsStates: map[domain.Domain]ResolvedDomainInfo{},
profile: NewStatusProfile(context.Background()),
}
}
func (d *Status) StartProfile(ctx context.Context) {
d.profile.Start(ctx)
}
func (d *Status) SetRelayMgr(manager *relayClient.Manager) {
d.profile.inc("SetRelayMgr")
d.muxRelays.Lock()
defer d.muxRelays.Unlock()
d.mux.Lock()
defer d.mux.Unlock()
d.relayMgr = manager
}
func (d *Status) SetIngressGwMgr(ingressGwMgr *ingressgw.Manager) {
d.profile.inc("SetIngressGwMgr")
d.mux.Lock()
defer d.mux.Unlock()
d.ingressGwMgr = ingressGwMgr
@@ -268,7 +257,6 @@ func (d *Status) SetIngressGwMgr(ingressGwMgr *ingressgw.Manager) {
// ReplaceOfflinePeers replaces
func (d *Status) ReplaceOfflinePeers(replacement []State) {
d.profile.inc("ReplaceOfflinePeers")
d.mux.Lock()
defer d.mux.Unlock()
d.offlinePeers = make([]State, len(replacement))
@@ -280,7 +268,6 @@ func (d *Status) ReplaceOfflinePeers(replacement []State) {
// AddPeer adds peer to Daemon status map
func (d *Status) AddPeer(peerPubKey string, fqdn string, ip string, ipv6 string) error {
d.profile.inc("AddPeer")
d.mux.Lock()
defer d.mux.Unlock()
@@ -308,7 +295,6 @@ func (d *Status) AddPeer(peerPubKey string, fqdn string, ip string, ipv6 string)
// GetPeer adds peer to Daemon status map
func (d *Status) GetPeer(peerPubKey string) (State, error) {
d.profile.inc("GetPeer")
d.mux.RLock()
defer d.mux.RUnlock()
@@ -320,7 +306,6 @@ func (d *Status) GetPeer(peerPubKey string) (State, error) {
}
func (d *Status) PeerByIP(ip string) (string, bool) {
d.profile.inc("PeerByIP")
d.mux.RLock()
defer d.mux.RUnlock()
@@ -338,7 +323,6 @@ func (d *Status) PeerByIP(ip string) (string, bool) {
// active peers are matched; peers moved into the offline slice by
// ReplaceOfflinePeers are intentionally treated as unknown.
func (d *Status) PeerStateByIP(ip string) (State, bool) {
d.profile.inc("PeerStateByIP")
if ip == "" {
return State{}, false
}
@@ -357,7 +341,6 @@ func (d *Status) PeerStateByIP(ip string) (State, bool) {
// RemovePeer removes peer from Daemon status map
func (d *Status) RemovePeer(peerPubKey string) error {
d.profile.inc("RemovePeer")
d.mux.Lock()
defer d.mux.Unlock()
@@ -379,7 +362,6 @@ func (d *Status) RemovePeer(peerPubKey string) error {
// UpdatePeerState updates peer status
func (d *Status) UpdatePeerState(receivedState State) error {
d.profile.inc("UpdatePeerState")
d.mux.Lock()
peerState, ok := d.peers[receivedState.PubKey]
@@ -422,7 +404,6 @@ func (d *Status) UpdatePeerState(receivedState State) error {
}
func (d *Status) AddPeerStateRoute(peer string, route string, resourceId route.ResID) error {
d.profile.inc("AddPeerStateRoute")
d.mux.Lock()
peerState, ok := d.peers[peer]
@@ -448,7 +429,6 @@ func (d *Status) AddPeerStateRoute(peer string, route string, resourceId route.R
}
func (d *Status) RemovePeerStateRoute(peer string, route string) error {
d.profile.inc("RemovePeerStateRoute")
d.mux.Lock()
peerState, ok := d.peers[peer]
@@ -479,13 +459,11 @@ func (d *Status) CheckRoutes(ip netip.Addr) ([]byte, bool) {
if d == nil {
return nil, false
}
d.profile.inc("CheckRoutes")
resId, isExitNode := d.routeIDLookup.Lookup(ip)
return []byte(resId), isExitNode
}
func (d *Status) UpdatePeerICEState(receivedState State) error {
d.profile.inc("UpdatePeerICEState")
d.mux.Lock()
peerState, ok := d.peers[receivedState.PubKey]
@@ -525,7 +503,6 @@ func (d *Status) UpdatePeerICEState(receivedState State) error {
}
func (d *Status) UpdatePeerRelayedState(receivedState State) error {
d.profile.inc("UpdatePeerRelayedState")
d.mux.Lock()
peerState, ok := d.peers[receivedState.PubKey]
@@ -562,7 +539,6 @@ func (d *Status) UpdatePeerRelayedState(receivedState State) error {
}
func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error {
d.profile.inc("UpdatePeerRelayedStateToDisconnected")
d.mux.Lock()
peerState, ok := d.peers[receivedState.PubKey]
@@ -598,7 +574,6 @@ func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error
}
func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error {
d.profile.inc("UpdatePeerICEStateToDisconnected")
d.mux.Lock()
peerState, ok := d.peers[receivedState.PubKey]
@@ -638,7 +613,6 @@ func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error {
// UpdateWireGuardPeerState updates the WireGuard bits of the peer state
func (d *Status) UpdateWireGuardPeerState(pubKey string, wgStats configurer.WGStats) error {
d.profile.inc("UpdateWireGuardPeerState")
d.mux.Lock()
defer d.mux.Unlock()
@@ -666,7 +640,6 @@ func hasConnStatusChanged(oldStatus, newStatus ConnStatus) bool {
// UpdatePeerFQDN update peer's state fqdn only
func (d *Status) UpdatePeerFQDN(peerPubKey, fqdn string) error {
d.profile.inc("UpdatePeerFQDN")
d.mux.Lock()
defer d.mux.Unlock()
@@ -683,7 +656,6 @@ func (d *Status) UpdatePeerFQDN(peerPubKey, fqdn string) error {
// UpdatePeerSSHHostKey updates peer's SSH host key
func (d *Status) UpdatePeerSSHHostKey(peerPubKey string, sshHostKey []byte) error {
d.profile.inc("UpdatePeerSSHHostKey")
d.mux.Lock()
defer d.mux.Unlock()
@@ -700,7 +672,6 @@ func (d *Status) UpdatePeerSSHHostKey(peerPubKey string, sshHostKey []byte) erro
// FinishPeerListModifications this event invoke the notification
func (d *Status) FinishPeerListModifications() {
d.profile.inc("FinishPeerListModifications")
d.mux.Lock()
if !d.peerListChangedForNotification {
@@ -733,7 +704,6 @@ func (d *Status) FinishPeerListModifications() {
}
func (d *Status) SubscribeToPeerStateChanges(ctx context.Context, peerID string) *StatusChangeSubscription {
d.profile.inc("SubscribeToPeerStateChanges")
d.mux.Lock()
defer d.mux.Unlock()
@@ -747,7 +717,6 @@ func (d *Status) SubscribeToPeerStateChanges(ctx context.Context, peerID string)
}
func (d *Status) UnsubscribePeerStateChanges(subscription *StatusChangeSubscription) {
d.profile.inc("UnsubscribePeerStateChanges")
d.mux.Lock()
defer d.mux.Unlock()
@@ -773,7 +742,6 @@ func (d *Status) UnsubscribePeerStateChanges(subscription *StatusChangeSubscript
// GetLocalPeerState returns the local peer state
func (d *Status) GetLocalPeerState() LocalPeerState {
d.profile.inc("GetLocalPeerState")
d.mux.RLock()
defer d.mux.RUnlock()
return d.localPeer.Clone()
@@ -781,7 +749,6 @@ func (d *Status) GetLocalPeerState() LocalPeerState {
// UpdateLocalPeerState updates local peer status
func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) {
d.profile.inc("UpdateLocalPeerState")
d.mux.Lock()
d.localPeer = localPeerState
fqdn := d.localPeer.FQDN
@@ -796,7 +763,6 @@ func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) {
// AddLocalPeerStateRoute adds a route to the local peer state
func (d *Status) AddLocalPeerStateRoute(route string, resourceId route.ResID) {
d.profile.inc("AddLocalPeerStateRoute")
d.mux.Lock()
defer d.mux.Unlock()
@@ -814,7 +780,6 @@ func (d *Status) AddLocalPeerStateRoute(route string, resourceId route.ResID) {
// RemoveLocalPeerStateRoute removes a route from the local peer state
func (d *Status) RemoveLocalPeerStateRoute(route string) {
d.profile.inc("RemoveLocalPeerStateRoute")
d.mux.Lock()
defer d.mux.Unlock()
@@ -828,7 +793,6 @@ func (d *Status) RemoveLocalPeerStateRoute(route string) {
// AddResolvedIPLookupEntry adds a resolved IP lookup entry
func (d *Status) AddResolvedIPLookupEntry(prefix netip.Prefix, resourceId route.ResID) {
d.profile.inc("AddResolvedIPLookupEntry")
d.mux.Lock()
defer d.mux.Unlock()
@@ -837,7 +801,6 @@ func (d *Status) AddResolvedIPLookupEntry(prefix netip.Prefix, resourceId route.
// RemoveResolvedIPLookupEntry removes a resolved IP lookup entry
func (d *Status) RemoveResolvedIPLookupEntry(route string) {
d.profile.inc("RemoveResolvedIPLookupEntry")
d.mux.Lock()
defer d.mux.Unlock()
@@ -849,7 +812,6 @@ func (d *Status) RemoveResolvedIPLookupEntry(route string) {
// CleanLocalPeerStateRoutes cleans all routes from the local peer state
func (d *Status) CleanLocalPeerStateRoutes() {
d.profile.inc("CleanLocalPeerStateRoutes")
d.mux.Lock()
defer d.mux.Unlock()
@@ -858,7 +820,6 @@ func (d *Status) CleanLocalPeerStateRoutes() {
// CleanLocalPeerState cleans local peer status
func (d *Status) CleanLocalPeerState() {
d.profile.inc("CleanLocalPeerState")
d.mux.Lock()
d.localPeer = LocalPeerState{}
fqdn := d.localPeer.FQDN
@@ -870,7 +831,6 @@ func (d *Status) CleanLocalPeerState() {
// MarkManagementDisconnected sets ManagementState to disconnected
func (d *Status) MarkManagementDisconnected(err error) {
d.profile.inc("MarkManagementDisconnected")
d.mux.Lock()
d.managementState = false
d.managementError = err
@@ -883,7 +843,6 @@ func (d *Status) MarkManagementDisconnected(err error) {
// MarkManagementConnected sets ManagementState to connected
func (d *Status) MarkManagementConnected() {
d.profile.inc("MarkManagementConnected")
d.mux.Lock()
d.managementState = true
d.managementError = nil
@@ -896,7 +855,6 @@ func (d *Status) MarkManagementConnected() {
// UpdateSignalAddress update the address of the signal server
func (d *Status) UpdateSignalAddress(signalURL string) {
d.profile.inc("UpdateSignalAddress")
d.mux.Lock()
defer d.mux.Unlock()
d.signalAddress = signalURL
@@ -904,7 +862,6 @@ func (d *Status) UpdateSignalAddress(signalURL string) {
// UpdateManagementAddress update the address of the management server
func (d *Status) UpdateManagementAddress(mgmAddress string) {
d.profile.inc("UpdateManagementAddress")
d.mux.Lock()
defer d.mux.Unlock()
d.mgmAddress = mgmAddress
@@ -912,7 +869,6 @@ func (d *Status) UpdateManagementAddress(mgmAddress string) {
// UpdateRosenpass update the Rosenpass configuration
func (d *Status) UpdateRosenpass(rosenpassEnabled, rosenpassPermissive bool) {
d.profile.inc("UpdateRosenpass")
d.mux.Lock()
defer d.mux.Unlock()
d.rosenpassPermissive = rosenpassPermissive
@@ -920,7 +876,6 @@ func (d *Status) UpdateRosenpass(rosenpassEnabled, rosenpassPermissive bool) {
}
func (d *Status) UpdateLazyConnection(enabled bool) {
d.profile.inc("UpdateLazyConnection")
d.mux.Lock()
defer d.mux.Unlock()
d.lazyConnectionEnabled = enabled
@@ -928,7 +883,6 @@ func (d *Status) UpdateLazyConnection(enabled bool) {
// MarkSignalDisconnected sets SignalState to disconnected
func (d *Status) MarkSignalDisconnected(err error) {
d.profile.inc("MarkSignalDisconnected")
d.mux.Lock()
d.signalState = false
d.signalError = err
@@ -941,7 +895,6 @@ func (d *Status) MarkSignalDisconnected(err error) {
// MarkSignalConnected sets SignalState to connected
func (d *Status) MarkSignalConnected() {
d.profile.inc("MarkSignalConnected")
d.mux.Lock()
d.signalState = true
d.signalError = nil
@@ -953,21 +906,18 @@ func (d *Status) MarkSignalConnected() {
}
func (d *Status) UpdateRelayStates(relayResults []relay.ProbeResult) {
d.profile.inc("UpdateRelayStates")
d.muxRelays.Lock()
defer d.muxRelays.Unlock()
d.mux.Lock()
defer d.mux.Unlock()
d.relayStates = relayResults
}
func (d *Status) UpdateDNSStates(dnsStates []NSGroupState) {
d.profile.inc("UpdateDNSStates")
d.mux.Lock()
defer d.mux.Unlock()
d.nsGroupStates = dnsStates
}
func (d *Status) UpdateResolvedDomainsStates(originalDomain domain.Domain, resolvedDomain domain.Domain, prefixes []netip.Prefix, resourceId route.ResID) {
d.profile.inc("UpdateResolvedDomainsStates")
d.mux.Lock()
defer d.mux.Unlock()
@@ -983,7 +933,6 @@ func (d *Status) UpdateResolvedDomainsStates(originalDomain domain.Domain, resol
}
func (d *Status) DeleteResolvedDomainsStates(domain domain.Domain) {
d.profile.inc("DeleteResolvedDomainsStates")
d.mux.Lock()
defer d.mux.Unlock()
@@ -1000,7 +949,6 @@ func (d *Status) DeleteResolvedDomainsStates(domain domain.Domain) {
}
func (d *Status) GetRosenpassState() RosenpassState {
d.profile.inc("GetRosenpassState")
d.mux.RLock()
defer d.mux.RUnlock()
return RosenpassState{
@@ -1010,14 +958,12 @@ func (d *Status) GetRosenpassState() RosenpassState {
}
func (d *Status) GetLazyConnection() bool {
d.profile.inc("GetLazyConnection")
d.mux.RLock()
defer d.mux.RUnlock()
return d.lazyConnectionEnabled
}
func (d *Status) GetManagementState() ManagementState {
d.profile.inc("GetManagementState")
d.mux.RLock()
defer d.mux.RUnlock()
return ManagementState{
@@ -1028,7 +974,6 @@ func (d *Status) GetManagementState() ManagementState {
}
func (d *Status) UpdateLatency(pubKey string, latency time.Duration) error {
d.profile.inc("UpdateLatency")
if latency <= 0 {
return nil
}
@@ -1046,7 +991,6 @@ func (d *Status) UpdateLatency(pubKey string, latency time.Duration) error {
// IsLoginRequired determines if a peer's login has expired.
func (d *Status) IsLoginRequired() bool {
d.profile.inc("IsLoginRequired")
d.mux.RLock()
defer d.mux.RUnlock()
@@ -1063,7 +1007,6 @@ func (d *Status) IsLoginRequired() bool {
}
func (d *Status) GetSignalState() SignalState {
d.profile.inc("GetSignalState")
d.mux.RLock()
defer d.mux.RUnlock()
return SignalState{
@@ -1075,34 +1018,24 @@ func (d *Status) GetSignalState() SignalState {
// GetRelayStates returns the stun/turn/permanent relay states
func (d *Status) GetRelayStates() []relay.ProbeResult {
d.profile.inc("GetRelayStates")
d.muxRelays.RLock()
// debug lines
started := time.Now()
defer func() {
debugElapsed("GetRelayStates", started)
}()
d.mux.RLock()
defer d.mux.RUnlock()
if d.relayMgr == nil {
defer d.muxRelays.RUnlock()
return d.relayStates
}
relayMgr := d.relayMgr
// extend the list of stun, turn servers with the relay server connections
relayStates := slices.Clone(d.relayStates)
d.muxRelays.RUnlock()
states := relayMgr.RelayStates()
states := d.relayMgr.RelayStates()
if len(states) == 0 {
// no relay connection tracked yet; surface configured servers as
// unavailable with the real reconnect error when known
err := relayClient.ErrRelayClientNotConnected
if connErr := relayMgr.RelayConnectError(); connErr != nil {
if connErr := d.relayMgr.RelayConnectError(); connErr != nil {
err = connErr
}
for _, r := range relayMgr.ServerURLs() {
for _, r := range d.relayMgr.ServerURLs() {
relayStates = append(relayStates, relay.ProbeResult{
URI: r,
Err: err,
@@ -1122,15 +1055,7 @@ func (d *Status) GetRelayStates() []relay.ProbeResult {
}
func (d *Status) ForwardingRules() []firewall.ForwardRule {
d.profile.inc("ForwardingRules")
d.mux.RLock()
// debug lines
started := time.Now()
defer func() {
debugElapsed("ForwardingRules", started)
}()
defer d.mux.RUnlock()
if d.ingressGwMgr == nil {
return nil
@@ -1140,7 +1065,6 @@ func (d *Status) ForwardingRules() []firewall.ForwardRule {
}
func (d *Status) GetDNSStates() []NSGroupState {
d.profile.inc("GetDNSStates")
d.mux.RLock()
defer d.mux.RUnlock()
@@ -1149,7 +1073,6 @@ func (d *Status) GetDNSStates() []NSGroupState {
}
func (d *Status) GetResolvedDomainsStates() map[domain.Domain]ResolvedDomainInfo {
d.profile.inc("GetResolvedDomainsStates")
d.mux.RLock()
defer d.mux.RUnlock()
return maps.Clone(d.resolvedDomainsStates)
@@ -1157,7 +1080,6 @@ func (d *Status) GetResolvedDomainsStates() map[domain.Domain]ResolvedDomainInfo
// GetFullStatus gets full status
func (d *Status) GetFullStatus() FullStatus {
d.profile.inc("GetFullStatus")
fullStatus := FullStatus{
ManagementState: d.GetManagementState(),
SignalState: d.GetSignalState(),
@@ -1184,31 +1106,26 @@ func (d *Status) GetFullStatus() FullStatus {
// ClientStart will notify all listeners about the new service state
func (d *Status) ClientStart() {
d.profile.inc("ClientStart")
d.notifier.clientStart()
}
// ClientStop will notify all listeners about the new service state
func (d *Status) ClientStop() {
d.profile.inc("ClientStop")
d.notifier.clientStop()
}
// ClientTeardown will notify all listeners about the service is under teardown
func (d *Status) ClientTeardown() {
d.profile.inc("ClientTeardown")
d.notifier.clientTearDown()
}
// SetConnectionListener set a listener to the notifier
func (d *Status) SetConnectionListener(listener Listener) {
d.profile.inc("SetConnectionListener")
d.notifier.setListener(listener)
}
// RemoveConnectionListener remove the listener from the notifier
func (d *Status) RemoveConnectionListener() {
d.profile.inc("RemoveConnectionListener")
d.notifier.removeListener()
}
@@ -1280,7 +1197,6 @@ func (d *Status) PublishEvent(
userMsg string,
metadata map[string]string,
) {
d.profile.inc("PublishEvent")
event := &proto.SystemEvent{
Id: uuid.New().String(),
Severity: severity,
@@ -1292,13 +1208,6 @@ func (d *Status) PublishEvent(
}
d.eventMux.Lock()
// debug lines
started := time.Now()
defer func() {
debugElapsed("PublishEvent", started)
}()
defer d.eventMux.Unlock()
d.eventQueue.Add(event)
@@ -1316,7 +1225,6 @@ func (d *Status) PublishEvent(
// SubscribeToEvents returns a new event subscription
func (d *Status) SubscribeToEvents() *EventSubscription {
d.profile.inc("SubscribeToEvents")
d.eventMux.Lock()
defer d.eventMux.Unlock()
@@ -1332,7 +1240,6 @@ func (d *Status) SubscribeToEvents() *EventSubscription {
// UnsubscribeFromEvents removes an event subscription
func (d *Status) UnsubscribeFromEvents(sub *EventSubscription) {
d.profile.inc("UnsubscribeFromEvents")
if sub == nil {
return
}
@@ -1348,12 +1255,10 @@ func (d *Status) UnsubscribeFromEvents(sub *EventSubscription) {
// GetEventHistory returns all events in the queue
func (d *Status) GetEventHistory() []*proto.SystemEvent {
d.profile.inc("GetEventHistory")
return d.eventQueue.GetAll()
}
func (d *Status) SetWgIface(wgInterface WGIfaceStatus) {
d.profile.inc("SetWgIface")
d.mux.Lock()
defer d.mux.Unlock()
@@ -1361,15 +1266,7 @@ func (d *Status) SetWgIface(wgInterface WGIfaceStatus) {
}
func (d *Status) PeersStatus() (*configurer.Stats, error) {
d.profile.inc("PeersStatus")
d.mux.RLock()
// debug lines
started := time.Now()
defer func() {
debugElapsed("PeersStatus", started)
}()
defer d.mux.RUnlock()
if d.wgIface == nil {
return nil, fmt.Errorf("wgInterface is nil, cannot retrieve peers status")
@@ -1382,15 +1279,7 @@ func (d *Status) PeersStatus() (*configurer.Stats, error) {
// and updates the cached peer states. This ensures accurate handshake times and
// transfer statistics in status reports without running full health probes.
func (d *Status) RefreshWireGuardStats() error {
d.profile.inc("RefreshWireGuardStats")
d.mux.Lock()
// debug lines
started := time.Now()
defer func() {
debugElapsed("RefreshWireGuardStats", started)
}()
defer d.mux.Unlock()
if d.wgIface == nil {
@@ -1555,10 +1444,3 @@ func (fs FullStatus) ToProto() *proto.FullStatus {
return &pbFullStatus
}
func debugElapsed(msg string, startTime time.Time) {
if elapsed := time.Since(startTime); elapsed > 1*time.Second {
log.Infof("run %s took %s", msg, elapsed)
debug.PrintStack()
}
}

View File

@@ -1,97 +0,0 @@
package peer
import (
"context"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
log "github.com/sirupsen/logrus"
)
type StatusProfile struct {
counts sync.Map
}
func NewStatusProfile(ctx context.Context) *StatusProfile {
s := &StatusProfile{}
go s.Start(ctx)
return s
}
func (s *StatusProfile) Start(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.logCounts()
}
}
}
func (s *StatusProfile) inc(method string) {
if s == nil {
return
}
if v, ok := s.counts.Load(method); ok {
v.(*atomic.Int64).Add(1)
return
}
cnt := &atomic.Int64{}
actual, _ := s.counts.LoadOrStore(method, cnt)
actual.(*atomic.Int64).Add(1)
}
func (s *StatusProfile) snapshot() map[string]int64 {
out := make(map[string]int64)
s.counts.Range(func(k, v any) bool {
out[k.(string)] = v.(*atomic.Int64).Load()
return true
})
return out
}
func (s *StatusProfile) logCounts() {
counts := s.snapshot()
if len(counts) == 0 {
log.Infof("status profile: no Status method calls so far")
return
}
type kv struct {
method string
count int64
}
sorted := make([]kv, 0, len(counts))
var total int64
for m, c := range counts {
if c == 0 {
continue
}
sorted = append(sorted, kv{m, c})
total += c
}
sort.Slice(sorted, func(i, j int) bool {
if sorted[i].count != sorted[j].count {
return sorted[i].count > sorted[j].count
}
return sorted[i].method < sorted[j].method
})
var b strings.Builder
for i, e := range sorted {
if i > 0 {
b.WriteString(", ")
}
b.WriteString(e.method)
b.WriteByte('=')
b.WriteString(strconv.FormatInt(e.count, 10))
}
log.Infof("status profile (cumulative total=%d): %s", total, b.String())
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"fmt"
"net"
"net/netip"
"strconv"
"sync"
"time"
@@ -165,10 +164,6 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA
return
}
if candidateViaRoutes(candidate, haRoutes) {
return
}
if err := w.agent.AddRemoteCandidate(candidate); err != nil {
w.log.Errorf("error while handling remote candidate")
return
@@ -466,7 +461,7 @@ func (w *WorkerICE) createForwardedCandidate(srflxCandidate ice.Candidate, mappi
}
func (w *WorkerICE) onICESelectedCandidatePair(agent *icemaker.ThreadSafeAgent, c1, c2 ice.Candidate) {
w.log.Infof("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
w.log.Debugf("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
w.config.Key)
pairStat, ok := agent.GetSelectedCandidatePairStats()
@@ -589,34 +584,6 @@ func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive
return ec, nil
}
func candidateViaRoutes(candidate ice.Candidate, clientRoutes route.HAMap) bool {
addr, err := netip.ParseAddr(candidate.Address())
if err != nil {
log.Errorf("Failed to parse IP address %s: %v", candidate.Address(), err)
return false
}
var routePrefixes []netip.Prefix
for _, routes := range clientRoutes {
if len(routes) > 0 && routes[0] != nil {
routePrefixes = append(routePrefixes, routes[0].Network)
}
}
for _, prefix := range routePrefixes {
// default route is handled by route exclusion / ip rules
if prefix.Bits() == 0 {
continue
}
if prefix.Contains(addr) {
log.Debugf("Ignoring candidate [%s], its address is part of routed network %s", candidate.String(), prefix)
return true
}
}
return false
}
func isRelayCandidate(candidate ice.Candidate) bool {
return candidate.Type() == ice.CandidateTypeRelay
}

View File

@@ -121,9 +121,12 @@ func (r *SysOps) addRouteToNonVPNIntf(prefix netip.Prefix, vpnIntf wgIface, init
return Nexthop{}, vars.ErrRouteNotAllowed
}
// Check if the prefix is part of any local subnets
if isLocal, subnet := r.isPrefixInLocalSubnets(prefix); isLocal {
return Nexthop{}, fmt.Errorf("prefix %s is part of local subnet %s: %w", prefix, subnet, vars.ErrRouteNotAllowed)
// BSDs blackhole a /32 added inside a directly-connected subnet; Linux/Windows need it to beat the wt0 route.
switch runtime.GOOS {
case "darwin", "freebsd", "netbsd", "openbsd", "dragonfly":
if isLocal, subnet := r.isPrefixInLocalSubnets(prefix); isLocal {
return Nexthop{}, fmt.Errorf("prefix %s is part of local subnet %s: %w", prefix, subnet, vars.ErrRouteNotAllowed)
}
}
// Determine the exit interface and next hop for the prefix, so we can add a specific route

2497
client/proto/daemon.pb.gw.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -12,5 +12,11 @@ script_path=$(dirname "$(realpath "$0")")
cd "$script_path"
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.6
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.6.1
protoc -I ./ ./daemon.proto --go_out=../ --go-grpc_out=../ --experimental_allow_proto3_optional
go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@v2.26.3
protoc -I ./ ./daemon.proto \
--go_out=../ \
--go-grpc_out=../ \
--grpc-gateway_out=../ \
--grpc-gateway_opt=generate_unbound_methods=true \
--experimental_allow_proto3_optional
cd "$old_pwd"

View File

@@ -136,7 +136,6 @@ func New(ctx context.Context, logFile string, configFile string, profilesDisable
networksDisabled: networksDisabled,
jwtCache: newJWTCache(),
}
go s.statusRecorder.StartProfile(ctx)
agent := &serverAgent{s}
s.sleepHandler = sleephandler.New(agent)
s.startSleepDetector()

2
go.mod
View File

@@ -66,6 +66,7 @@ require (
github.com/google/nftables v0.3.0
github.com/gopacket/gopacket v1.4.0
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.2-0.20240212192251-757544f21357
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/go-secure-stdlib/base62 v0.1.2
github.com/hashicorp/go-version v1.7.0
@@ -330,6 +331,7 @@ require (
golang.org/x/text v0.36.0 // indirect
golang.org/x/tools v0.43.0 // indirect
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260319201613-d00831a3d3e7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect

View File

@@ -243,11 +243,7 @@ func NewClientWithServerIP(serverURL string, serverIP netip.Addr, authTokenStore
// Connect establishes a connection to the relay server. It blocks until the connection is established or an error occurs.
func (c *Client) Connect(ctx context.Context) error {
start := time.Now()
defer func() {
c.log.Infof("connect elapsed time: %v", time.Since(start))
}()
c.log.Infof("connecting to relay server")
c.readLoopMutex.Lock()
defer c.readLoopMutex.Unlock()
@@ -291,11 +287,6 @@ func (c *Client) Connect(ctx context.Context) error {
func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, error) {
peerID := messages.HashID(dstPeerID)
start := time.Now()
defer func() {
c.log.Infof("connect elapsed time: %v", time.Since(start))
}()
c.mu.Lock()
if !c.serviceIsRunning {
c.mu.Unlock()

View File

@@ -8,7 +8,6 @@ import (
"testing"
"time"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"github.com/netbirdio/netbird/client/iface"
@@ -253,7 +252,7 @@ func TestClient_ConnectedIPParsesRemoteAddr(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{log: log.WithField("relay", tt.name), relayConn: stubConn{remote: staticAddr{s: tt.s}}}
c := &Client{relayConn: stubConn{remote: staticAddr{s: tt.s}}}
got := c.ConnectedIP()
var gotStr string
if got.IsValid() {

View File

@@ -78,23 +78,6 @@ type GrpcClient struct {
// transport-alive but no longer delivering messages. It is the source of
// truth IsHealthy reads, and is cleared once any frame is received again.
receiveStalled atomic.Bool
// receiveHandoffBlocked is set while the receive loop is parked handing a
// message to a busy decryption worker. The loop stops calling Recv (and
// markReceived) in that window, so the stream looks silent though it is
// healthy. The watchdog reads this to avoid misreading self-inflicted
// receive backpressure as a dead stream: reconnecting cannot help, since the
// new stream feeds the same worker, and only triggers a reconnect storm.
receiveHandoffBlocked atomic.Bool
// lastDecrypt holds the Unix-nano timestamp of the last message the decryption
// worker pulled off its queue. Diagnostic only: it lets a stall log show
// whether the worker was draining (busy) or idle when the stream went silent.
lastDecrypt atomic.Int64
// handoffWaitTotal, handoffWaitMax (nanos) and handoffWaitCount accumulate the
// time the receive loop spent blocked handing messages to the worker. This is
// time not spent reading the stream, so it quantifies receive backpressure.
handoffWaitTotal atomic.Int64
handoffWaitMax atomic.Int64
handoffWaitCount atomic.Int64
}
// NewClient creates a new Signal client
@@ -370,8 +353,6 @@ func (c *GrpcClient) SendToStream(msg *proto.EncryptedMessage) error {
// decryptMessage decrypts the body of the msg using Wireguard private key and Remote peer's public key
func (c *GrpcClient) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, error) {
c.lastDecrypt.Store(time.Now().UnixNano())
remoteKey, err := wgtypes.ParseKey(msg.GetKey())
if err != nil {
return nil, err
@@ -458,22 +439,6 @@ func (c *GrpcClient) idleSinceReceive() time.Duration {
return time.Since(time.Unix(0, c.lastReceived.Load()))
}
// idleSinceDecrypt returns how long since the worker last pulled a message.
// Diagnostic only: distinguishes a busy/wedged worker from an idle one.
func (c *GrpcClient) idleSinceDecrypt() time.Duration {
return time.Since(time.Unix(0, c.lastDecrypt.Load()))
}
// receiveAlive reports whether the receive stream shows liveness: it delivered a
// frame within the inactivity threshold, or the receive loop is currently parked
// handing a message to a busy decryption worker. In the latter case the loop has
// stopped calling Recv, so the stream looks silent while being healthy, and
// reconnecting would not help, so the watchdog must treat it as alive.
func (c *GrpcClient) receiveAlive() bool {
return c.idleSinceReceive() < receiveInactivityThreshold ||
c.receiveHandoffBlocked.Load()
}
// watchReceiveStream guards against a receive stream that is transport-alive but
// no longer delivering messages. While the stream is idle past
// receiveInactivityThreshold it sends a self-addressed probe that the Signal
@@ -485,55 +450,18 @@ func (c *GrpcClient) watchReceiveStream(ctx context.Context, cancelStream contex
defer ticker.Stop()
var probeSentAt time.Time
var holdLogged bool
var statTicks int
var lastStatTotal int64
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Periodic backpressure summary so time lost to the worker handoff is
// visible even when no stall fires. Emitted ~once a minute and only
// when the wait grew, to stay quiet on a healthy stream.
if statTicks++; statTicks >= int(time.Minute/receiveWatchdogInterval) {
statTicks = 0
if total, max, count := c.handoffWaitStats(); int64(total) > lastStatTotal {
log.Infof("signal receive backpressure: handoffWaitTotal=%s (+%s last min) handoffWaitMax=%s handoffMsgs=%d",
total.Round(time.Second), (total - time.Duration(lastStatTotal)).Round(time.Millisecond),
max.Round(time.Millisecond), count)
lastStatTotal = int64(total)
}
}
if c.receiveAlive() {
// Attribute the case that matters in the field: silent past the
// threshold but held because the receive loop is parked on the
// worker handoff (backpressure), not a dead stream. Log once per
// hold episode so a persistent worker stall is visible at info.
if c.idleSinceReceive() >= receiveInactivityThreshold && c.receiveHandoffBlocked.Load() {
if !holdLogged {
total, max, count := c.handoffWaitStats()
log.Infof("signal receive idle %s, loop blocked on worker handoff (idleDecrypt=%s queueDepth=%d connState=%s handoffWaitTotal=%s handoffWaitMax=%s handoffMsgs=%d); holding stream",
c.idleSinceReceive().Round(time.Second), c.idleSinceDecrypt().Round(time.Second),
c.decryptionWorker.QueueLen(), c.signalConn.GetState(),
total.Round(time.Second), max.Round(time.Millisecond), count)
holdLogged = true
}
} else {
holdLogged = false
}
if c.idleSinceReceive() < receiveInactivityThreshold {
probeSentAt = time.Time{}
continue
}
holdLogged = false
if !probeSentAt.IsZero() && time.Since(probeSentAt) >= receiveProbeTimeout {
total, max, count := c.handoffWaitStats()
log.Warnf("signal receive stream stalled, reconnecting: idleRecv=%s idleDecrypt=%s handoffBlocked=%v queueDepth=%d connState=%s handoffWaitTotal=%s handoffWaitMax=%s handoffMsgs=%d probe did not return",
c.idleSinceReceive().Round(time.Second), c.idleSinceDecrypt().Round(time.Second),
c.receiveHandoffBlocked.Load(), c.decryptionWorker.QueueLen(), c.signalConn.GetState(),
total.Round(time.Second), max.Round(time.Millisecond), count)
log.Warnf("signal receive stream stalled: no messages for %s and probe did not return, reconnecting", c.idleSinceReceive().Round(time.Second))
c.receiveStalled.Store(true)
c.notifyDisconnected(errReceiveStreamStalled)
cancelStream()
@@ -589,37 +517,12 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) er
continue
}
// The handoff blocks while the worker is busy, which parks this loop and
// stops Recv. Flag it so the watchdog does not read the resulting silence
// as a dead stream, and account the wait as receive backpressure.
handoffStart := time.Now()
c.receiveHandoffBlocked.Store(true)
if err := c.decryptionWorker.AddMsg(c.ctx, msg); err != nil {
log.Errorf("failed to add message to decryption worker: %v", err)
}
c.receiveHandoffBlocked.Store(false)
c.recordHandoffWait(time.Since(handoffStart))
}
}
// recordHandoffWait accumulates the time the receive loop was blocked handing a
// message to the worker.
func (c *GrpcClient) recordHandoffWait(d time.Duration) {
c.handoffWaitTotal.Add(int64(d))
c.handoffWaitCount.Add(1)
for {
cur := c.handoffWaitMax.Load()
if int64(d) <= cur || c.handoffWaitMax.CompareAndSwap(cur, int64(d)) {
break
}
}
}
// handoffWaitStats returns cumulative receive-loop handoff backpressure.
func (c *GrpcClient) handoffWaitStats() (total, max time.Duration, count int64) {
return time.Duration(c.handoffWaitTotal.Load()), time.Duration(c.handoffWaitMax.Load()), c.handoffWaitCount.Load()
}
func (c *GrpcClient) startEncryptionWorker(handler func(msg *proto.Message) error) {
if c.decryptionWorker != nil {
return

View File

@@ -82,27 +82,3 @@ func TestReceiveProbeRoundTrips(t *testing.T) {
t.Fatal("self-addressed heartbeat did not round-trip back through the signal server")
}
}
// TestReceiveAliveTreatsHandoffBlockAsLiveness reproduces the false positive
// where a busy decryption worker parks the receive loop on the worker handoff,
// so Recv (and markReceived) stops firing even though the stream is healthy.
// With the receive stream silent past the inactivity threshold but the loop
// blocked on handoff, the watchdog must consider the stream alive rather than
// tear it down (reconnecting feeds the same worker and would not help).
func TestReceiveAliveTreatsHandoffBlockAsLiveness(t *testing.T) {
c := &GrpcClient{}
// Receive stream silent and the loop not blocked on handoff: genuinely stalled.
c.lastReceived.Store(time.Now().Add(-2 * receiveInactivityThreshold).UnixNano())
require.False(t, c.receiveAlive(), "silent stream with the receive loop idle must be treated as stalled")
// Receive stream silent but the loop is parked handing a message to a busy
// worker: self-inflicted backpressure, not a dead stream, must not tear down.
c.receiveHandoffBlocked.Store(true)
require.True(t, c.receiveAlive(), "a receive loop blocked on worker handoff must keep the stream alive")
// Handoff drained, loop back to reading, a frame just arrived: alive via the receive path.
c.receiveHandoffBlocked.Store(false)
c.markReceived()
require.True(t, c.receiveAlive(), "a freshly received frame must keep the stream alive")
}

View File

@@ -32,13 +32,6 @@ func (w *Worker) AddMsg(ctx context.Context, msg *proto.EncryptedMessage) error
return nil
}
// QueueLen returns the number of messages buffered for decryption. Diagnostic
// only: a non-empty queue while the receive stream is silent indicates the
// receive loop is parked on the handoff rather than the stream being dead.
func (w *Worker) QueueLen() int {
return len(w.encryptedMsgPool)
}
func (w *Worker) Work(ctx context.Context) {
for {
select {