mirror of
https://github.com/netbirdio/netbird.git
synced 2026-06-27 10:19:55 +00:00
Compare commits
16 Commits
feature/io
...
fix/revert
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e3c66ced13 | ||
|
|
94284d5400 | ||
|
|
fa91c119c3 | ||
|
|
b5d2f054c2 | ||
|
|
50ff095d68 | ||
|
|
0afe52cfeb | ||
|
|
31c277b1df | ||
|
|
566d21c2c3 | ||
|
|
858e2d1c34 | ||
|
|
35ed69bfe7 | ||
|
|
8446713d28 | ||
|
|
12f2e69af2 | ||
|
|
4cb2c62f2a | ||
|
|
17b2044596 | ||
|
|
07101c59ac | ||
|
|
51b6f6291b |
@@ -130,7 +130,7 @@ func debugConfigDump(cmd *cobra.Command, _ []string) error {
|
||||
|
||||
client := proto.NewDaemonServiceClient(conn)
|
||||
resp, err := client.GetConfig(cmd.Context(), &proto.GetConfigRequest{
|
||||
ProfileName: activeProf.Name,
|
||||
ProfileName: string(activeProf.ID),
|
||||
Username: currUser.Username,
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
@@ -41,6 +41,7 @@ type ICEBind struct {
|
||||
*wgConn.StdNetBind
|
||||
|
||||
transportNet transport.Net
|
||||
filterFn udpmux.FilterFn
|
||||
address wgaddr.Address
|
||||
mtu uint16
|
||||
|
||||
@@ -60,11 +61,12 @@ type ICEBind struct {
|
||||
ipv6Conn *net.UDPConn
|
||||
}
|
||||
|
||||
func NewICEBind(transportNet transport.Net, address wgaddr.Address, mtu uint16) *ICEBind {
|
||||
func NewICEBind(transportNet transport.Net, filterFn udpmux.FilterFn, address wgaddr.Address, mtu uint16) *ICEBind {
|
||||
b, _ := wgConn.NewStdNetBind().(*wgConn.StdNetBind)
|
||||
ib := &ICEBind{
|
||||
StdNetBind: b,
|
||||
transportNet: transportNet,
|
||||
filterFn: filterFn,
|
||||
address: address,
|
||||
mtu: mtu,
|
||||
endpoints: make(map[netip.Addr]net.Conn),
|
||||
@@ -263,6 +265,7 @@ func (s *ICEBind) createOrUpdateMux() {
|
||||
udpmux.UniversalUDPMuxParams{
|
||||
UDPConn: muxConn,
|
||||
Net: s.transportNet,
|
||||
FilterFn: s.filterFn,
|
||||
WGAddress: s.address,
|
||||
MTU: s.mtu,
|
||||
},
|
||||
|
||||
@@ -289,7 +289,7 @@ func setupICEBind(t *testing.T) *ICEBind {
|
||||
IP: netip.MustParseAddr("100.64.0.1"),
|
||||
Network: netip.MustParsePrefix("100.64.0.0/10"),
|
||||
}
|
||||
return NewICEBind(transportNet, address, 1280)
|
||||
return NewICEBind(transportNet, nil, address, 1280)
|
||||
}
|
||||
|
||||
func createDualStackConns(t *testing.T) (*net.UDPConn, *net.UDPConn) {
|
||||
|
||||
@@ -32,6 +32,8 @@ type TunKernelDevice struct {
|
||||
link *wgLink
|
||||
udpMuxConn net.PacketConn
|
||||
udpMux *udpmux.UniversalUDPMuxDefault
|
||||
|
||||
filterFn udpmux.FilterFn
|
||||
}
|
||||
|
||||
func NewKernelDevice(name string, address wgaddr.Address, wgPort int, key string, mtu uint16, transportNet transport.Net) *TunKernelDevice {
|
||||
@@ -102,6 +104,7 @@ func (t *TunKernelDevice) Up() (*udpmux.UniversalUDPMuxDefault, error) {
|
||||
bindParams := udpmux.UniversalUDPMuxParams{
|
||||
UDPConn: nbnet.WrapPacketConn(rawSock),
|
||||
Net: t.transportNet,
|
||||
FilterFn: t.filterFn,
|
||||
WGAddress: t.address,
|
||||
MTU: t.mtu,
|
||||
}
|
||||
|
||||
@@ -63,6 +63,7 @@ type WGIFaceOpts struct {
|
||||
MTU uint16
|
||||
MobileArgs *device.MobileIFaceArguments
|
||||
TransportNet transport.Net
|
||||
FilterFn udpmux.FilterFn
|
||||
DisableDNS bool
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
|
||||
// NewWGIFace Creates a new WireGuard interface instance
|
||||
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
|
||||
|
||||
var tun WGTunDevice
|
||||
if netstack.IsEnabled() {
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
|
||||
// NewWGIFace Creates a new WireGuard interface instance
|
||||
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
|
||||
|
||||
if netstack.IsEnabled() {
|
||||
wgIFace := &WGIface{
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
|
||||
// NewWGIFace Creates a new WireGuard interface instance
|
||||
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
|
||||
|
||||
wgIFace := &WGIface{
|
||||
tun: device.NewTunDevice(opts.IFaceName, opts.Address, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind, opts.MobileArgs.TunFd),
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
// NewWGIFace Creates a new WireGuard interface instance
|
||||
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
|
||||
if netstack.IsEnabled() {
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
|
||||
return &WGIface{
|
||||
tun: device.NewNetstackDevice(opts.IFaceName, opts.Address, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind, netstack.ListenAddr()),
|
||||
userspaceBind: true,
|
||||
@@ -30,7 +30,7 @@ func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
|
||||
}
|
||||
|
||||
if device.ModuleTunIsLoaded() {
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
|
||||
return &WGIface{
|
||||
tun: device.NewTunDevice(opts.IFaceName, opts.Address, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind),
|
||||
userspaceBind: true,
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -20,6 +22,10 @@ import (
|
||||
"github.com/netbirdio/netbird/client/iface/wgaddr"
|
||||
)
|
||||
|
||||
// FilterFn is a function that filters out candidates based on the address.
|
||||
// If it returns true, the address is to be filtered. It also returns the prefix of matching route.
|
||||
type FilterFn func(address netip.Addr) (bool, netip.Prefix, error)
|
||||
|
||||
// UniversalUDPMuxDefault handles STUN and TURN servers packets by wrapping the original UDPConn
|
||||
// It then passes packets to the UDPMux that does the actual connection muxing.
|
||||
type UniversalUDPMuxDefault struct {
|
||||
@@ -37,6 +43,7 @@ type UniversalUDPMuxParams struct {
|
||||
UDPConn net.PacketConn
|
||||
XORMappedAddrCacheTTL time.Duration
|
||||
Net transport.Net
|
||||
FilterFn FilterFn
|
||||
WGAddress wgaddr.Address
|
||||
MTU uint16
|
||||
}
|
||||
@@ -61,6 +68,7 @@ func NewUniversalUDPMuxDefault(params UniversalUDPMuxParams) *UniversalUDPMuxDef
|
||||
PacketConn: params.UDPConn,
|
||||
mux: m,
|
||||
logger: params.Logger,
|
||||
filterFn: params.FilterFn,
|
||||
address: params.WGAddress,
|
||||
}
|
||||
|
||||
@@ -107,12 +115,15 @@ func (m *UniversalUDPMuxDefault) ReadFromConn(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// UDPConn is a wrapper around UDPMux conn that overrides WriteTo to drop packets destined for the overlay subnet.
|
||||
// UDPConn is a wrapper around UDPMux conn that overrides ReadFrom and handles STUN/TURN packets
|
||||
type UDPConn struct {
|
||||
net.PacketConn
|
||||
mux *UniversalUDPMuxDefault
|
||||
logger logging.LeveledLogger
|
||||
address wgaddr.Address
|
||||
mux *UniversalUDPMuxDefault
|
||||
logger logging.LeveledLogger
|
||||
filterFn FilterFn
|
||||
// TODO: reset cache on route changes
|
||||
addrCache sync.Map
|
||||
address wgaddr.Address
|
||||
}
|
||||
|
||||
// GetPacketConn returns the underlying PacketConn
|
||||
@@ -121,18 +132,67 @@ func (u *UDPConn) GetPacketConn() net.PacketConn {
|
||||
}
|
||||
|
||||
func (u *UDPConn) WriteTo(b []byte, addr net.Addr) (int, error) {
|
||||
udpAddr, ok := addr.(*net.UDPAddr)
|
||||
if !ok {
|
||||
if u.filterFn == nil {
|
||||
return u.PacketConn.WriteTo(b, addr)
|
||||
}
|
||||
dst := udpAddr.AddrPort().Addr().Unmap()
|
||||
if (u.address.Network.IsValid() && u.address.Network.Contains(dst)) || (u.address.IPv6Net.IsValid() && u.address.IPv6Net.Contains(dst)) {
|
||||
log.Warnf("address %s is part of the NetBird network %s, refusing to write", addr, u.address)
|
||||
return 0, fmt.Errorf("address %s is part of the NetBird network %s, refusing to write", addr, u.address)
|
||||
|
||||
if isRouted, found := u.addrCache.Load(addr.String()); found {
|
||||
return u.handleCachedAddress(isRouted.(bool), b, addr)
|
||||
}
|
||||
|
||||
return u.handleUncachedAddress(b, addr)
|
||||
}
|
||||
|
||||
func (u *UDPConn) handleCachedAddress(isRouted bool, b []byte, addr net.Addr) (int, error) {
|
||||
if isRouted {
|
||||
return 0, fmt.Errorf("address %s is part of a routed network, refusing to write", addr)
|
||||
}
|
||||
return u.PacketConn.WriteTo(b, addr)
|
||||
}
|
||||
|
||||
func (u *UDPConn) handleUncachedAddress(b []byte, addr net.Addr) (int, error) {
|
||||
if err := u.performFilterCheck(addr); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return u.PacketConn.WriteTo(b, addr)
|
||||
}
|
||||
|
||||
func (u *UDPConn) performFilterCheck(addr net.Addr) error {
|
||||
host, err := getHostFromAddr(addr)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get host from address %s: %v", addr, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
a, err := netip.ParseAddr(host)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to parse address %s: %v", addr, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if u.address.Network.Contains(a) {
|
||||
log.Warnf("address %s is part of the NetBird network %s, refusing to write", addr, u.address)
|
||||
return fmt.Errorf("address %s is part of the NetBird network %s, refusing to write", addr, u.address)
|
||||
}
|
||||
|
||||
if isRouted, prefix, err := u.filterFn(a); err != nil {
|
||||
log.Errorf("Failed to check if address %s is routed: %v", addr, err)
|
||||
} else {
|
||||
u.addrCache.Store(addr.String(), isRouted)
|
||||
if isRouted {
|
||||
// Extra log, as the error only shows up with ICE logging enabled
|
||||
log.Infof("address %s is part of routed network %s, refusing to write", addr, prefix)
|
||||
return fmt.Errorf("address %s is part of routed network %s, refusing to write", addr, prefix)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getHostFromAddr(addr net.Addr) (string, error) {
|
||||
host, _, err := net.SplitHostPort(addr.String())
|
||||
return host, err
|
||||
}
|
||||
|
||||
// GetSharedConn returns the shared udp conn
|
||||
func (m *UniversalUDPMuxDefault) GetSharedConn() net.PacketConn {
|
||||
return m.params.UDPConn
|
||||
@@ -165,13 +225,6 @@ func (m *UniversalUDPMuxDefault) HandleSTUNMessage(msg *stun.Message, addr net.A
|
||||
return nil
|
||||
}
|
||||
|
||||
src := udpAddr.AddrPort().Addr().Unmap()
|
||||
wg := m.params.WGAddress
|
||||
if (wg.Network.IsValid() && wg.Network.Contains(src)) || (wg.IPv6Net.IsValid() && wg.IPv6Net.Contains(src)) {
|
||||
log.Debugf("dropping STUN message from overlay source %s", udpAddr)
|
||||
return nil
|
||||
}
|
||||
|
||||
if m.isXORMappedResponse(msg, udpAddr.String()) {
|
||||
err := m.handleXORMappedResponse(udpAddr, msg)
|
||||
if err != nil {
|
||||
|
||||
@@ -66,7 +66,7 @@ func seedProxyForProxyCloseByRemoteConn() ([]proxyInstance, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
iceBind := bind.NewICEBind(nil, wgAddress, 1280)
|
||||
iceBind := bind.NewICEBind(nil, nil, wgAddress, 1280)
|
||||
endpointAddress := &net.UDPAddr{
|
||||
IP: net.IPv4(10, 0, 0, 1),
|
||||
Port: 1234,
|
||||
|
||||
@@ -22,7 +22,7 @@ func seedProxyForProxyCloseByRemoteConn() ([]proxyInstance, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
iceBind := bind.NewICEBind(nil, wgAddress, 1280)
|
||||
iceBind := bind.NewICEBind(nil, nil, wgAddress, 1280)
|
||||
endpointAddress := &net.UDPAddr{
|
||||
IP: net.IPv4(10, 0, 0, 1),
|
||||
Port: 1234,
|
||||
|
||||
@@ -51,13 +51,20 @@ type cachedRecord struct {
|
||||
}
|
||||
|
||||
// Resolver caches critical NetBird infrastructure domains.
|
||||
// records, refreshing, mgmtDomain and serverDomains are all guarded by mutex.
|
||||
// records, refreshing, failedResolves, mgmtDomain and serverDomains are all
|
||||
// guarded by mutex.
|
||||
type Resolver struct {
|
||||
records map[dns.Question]*cachedRecord
|
||||
mgmtDomain *domain.Domain
|
||||
serverDomains *dnsconfig.ServerDomains
|
||||
mutex sync.RWMutex
|
||||
|
||||
// failedResolves records the last failed initial resolve per domain so a
|
||||
// domain that never resolves isn't retried on every server-domains update
|
||||
// until refreshBackoff elapses. Entries are cleared on success and pruned
|
||||
// to the current server-domains set.
|
||||
failedResolves map[domain.Domain]time.Time
|
||||
|
||||
chain ChainResolver
|
||||
chainMaxPriority int
|
||||
refreshGroup singleflight.Group
|
||||
@@ -76,9 +83,10 @@ type Resolver struct {
|
||||
// NewResolver creates a new management domains cache resolver.
|
||||
func NewResolver() *Resolver {
|
||||
return &Resolver{
|
||||
records: make(map[dns.Question]*cachedRecord),
|
||||
refreshing: make(map[dns.Question]*atomic.Bool),
|
||||
cacheTTL: resolveCacheTTL(),
|
||||
records: make(map[dns.Question]*cachedRecord),
|
||||
refreshing: make(map[dns.Question]*atomic.Bool),
|
||||
failedResolves: make(map[domain.Domain]time.Time),
|
||||
cacheTTL: resolveCacheTTL(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,7 +181,9 @@ func (m *Resolver) continueToNext(w dns.ResponseWriter, r *dns.Msg) {
|
||||
|
||||
// AddDomain resolves a domain and stores its A/AAAA records in the cache.
|
||||
// A family that resolves NODATA (nil err, zero records) evicts any stale
|
||||
// entry for that qtype.
|
||||
// entry for that qtype. When one family hard-errors while the other succeeds,
|
||||
// the resolved family is still cached but AddDomain returns an error so the
|
||||
// caller retries the incomplete resolve rather than treating it as complete.
|
||||
func (m *Resolver) AddDomain(ctx context.Context, d domain.Domain) error {
|
||||
dnsName := strings.ToLower(dns.Fqdn(d.PunycodeString()))
|
||||
|
||||
@@ -203,6 +213,10 @@ func (m *Resolver) AddDomain(ctx context.Context, d domain.Domain) error {
|
||||
log.Debugf("added/updated domain=%s with %d A records and %d AAAA records",
|
||||
d.SafeString(), len(aRecords), len(aaaaRecords))
|
||||
|
||||
if errA != nil || errAAAA != nil {
|
||||
return fmt.Errorf("resolve %s: incomplete, a family failed: %w", d.SafeString(), errors.Join(errA, errAAAA))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -462,6 +476,7 @@ func (m *Resolver) RemoveDomain(d domain.Domain) error {
|
||||
delete(m.records, qAAAA)
|
||||
delete(m.refreshing, qA)
|
||||
delete(m.refreshing, qAAAA)
|
||||
delete(m.failedResolves, d)
|
||||
|
||||
log.Debugf("removed domain=%s from cache", d.SafeString())
|
||||
return nil
|
||||
@@ -505,6 +520,7 @@ func (m *Resolver) UpdateFromServerDomains(ctx context.Context, serverDomains dn
|
||||
allDomains := m.extractDomainsFromServerDomains(updatedServerDomains)
|
||||
currentDomains := m.GetCachedDomains()
|
||||
removedDomains = m.removeStaleDomains(currentDomains, allDomains)
|
||||
m.pruneFailedResolves(allDomains)
|
||||
}
|
||||
|
||||
m.addNewDomains(ctx, newDomains)
|
||||
@@ -577,13 +593,85 @@ func (m *Resolver) isManagementDomain(domain domain.Domain) bool {
|
||||
return m.mgmtDomain != nil && domain == *m.mgmtDomain
|
||||
}
|
||||
|
||||
// addNewDomains resolves and caches all domains from the update
|
||||
// addNewDomains resolves and caches domains that are not yet in the cache,
|
||||
// running the lookups concurrently. Domains already cached are skipped and left
|
||||
// to the stale-while-revalidate refresh path, so a sync never re-resolves them
|
||||
// synchronously: once NetBird owns the OS resolver the resolve runs through the
|
||||
// handler chain and would otherwise dial the managed upstreams under the engine
|
||||
// sync lock on every update.
|
||||
func (m *Resolver) addNewDomains(ctx context.Context, newDomains domain.List) {
|
||||
var wg sync.WaitGroup
|
||||
seen := make(map[domain.Domain]struct{}, len(newDomains))
|
||||
for _, newDomain := range newDomains {
|
||||
if err := m.AddDomain(ctx, newDomain); err != nil {
|
||||
log.Warnf("failed to add/update domain=%s: %v", newDomain.SafeString(), err)
|
||||
} else {
|
||||
log.Debugf("added/updated management cache domain=%s", newDomain.SafeString())
|
||||
if _, dup := seen[newDomain]; dup {
|
||||
continue
|
||||
}
|
||||
seen[newDomain] = struct{}{}
|
||||
|
||||
if !m.needsResolve(newDomain) {
|
||||
continue
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(d domain.Domain) {
|
||||
defer wg.Done()
|
||||
if err := m.AddDomain(ctx, d); err != nil {
|
||||
m.markResolveFailed(d)
|
||||
log.Warnf("failed to add/update domain=%s: %v", d.SafeString(), err)
|
||||
return
|
||||
}
|
||||
m.clearResolveFailed(d)
|
||||
log.Debugf("added/updated management cache domain=%s", d.SafeString())
|
||||
}(newDomain)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// needsResolve reports whether d should be resolved now. A recent failed or
|
||||
// incomplete resolve gates retries on the backoff even when one family is
|
||||
// already cached, so a transiently-failed family is retried instead of being
|
||||
// treated as fully resolved. Otherwise a domain with any cached record is left
|
||||
// to the stale-while-revalidate refresh path.
|
||||
func (m *Resolver) needsResolve(d domain.Domain) bool {
|
||||
dnsName := strings.ToLower(dns.Fqdn(d.PunycodeString()))
|
||||
|
||||
m.mutex.RLock()
|
||||
defer m.mutex.RUnlock()
|
||||
|
||||
if failedAt, ok := m.failedResolves[d]; ok {
|
||||
return time.Since(failedAt) >= refreshBackoff
|
||||
}
|
||||
|
||||
for _, qtype := range []uint16{dns.TypeA, dns.TypeAAAA} {
|
||||
q := dns.Question{Name: dnsName, Qtype: qtype, Qclass: dns.ClassINET}
|
||||
if _, ok := m.records[q]; ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *Resolver) markResolveFailed(d domain.Domain) {
|
||||
m.mutex.Lock()
|
||||
m.failedResolves[d] = time.Now()
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (m *Resolver) clearResolveFailed(d domain.Domain) {
|
||||
m.mutex.Lock()
|
||||
delete(m.failedResolves, d)
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
|
||||
// pruneFailedResolves drops failure markers for domains no longer present in
|
||||
// the server-domains set, keeping the map bounded to the current set (a
|
||||
// failed-only domain has no cached record, so RemoveDomain never sees it).
|
||||
func (m *Resolver) pruneFailedResolves(domains domain.List) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
for d := range m.failedResolves {
|
||||
if !slices.Contains(domains, d) {
|
||||
delete(m.failedResolves, d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ type fakeChain struct {
|
||||
mu sync.Mutex
|
||||
calls map[string]int
|
||||
answers map[string][]dns.RR
|
||||
qErr map[string]error
|
||||
err error
|
||||
hasRoot bool
|
||||
onLookup func()
|
||||
@@ -30,6 +31,7 @@ func newFakeChain() *fakeChain {
|
||||
return &fakeChain{
|
||||
calls: map[string]int{},
|
||||
answers: map[string][]dns.RR{},
|
||||
qErr: map[string]error{},
|
||||
hasRoot: true,
|
||||
}
|
||||
}
|
||||
@@ -47,6 +49,9 @@ func (f *fakeChain) ResolveInternal(ctx context.Context, msg *dns.Msg, maxPriori
|
||||
f.calls[key]++
|
||||
answers := f.answers[key]
|
||||
err := f.err
|
||||
if err == nil {
|
||||
err = f.qErr[key]
|
||||
}
|
||||
onLookup := f.onLookup
|
||||
f.mu.Unlock()
|
||||
|
||||
@@ -75,6 +80,12 @@ func (f *fakeChain) setAnswer(name string, qtype uint16, ip string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakeChain) setErr(name string, qtype uint16, err error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.qErr[name+"|"+dns.TypeToString[qtype]] = err
|
||||
}
|
||||
|
||||
func (f *fakeChain) callCount(name string, qtype uint16) int {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
183
client/internal/dns/mgmt/mgmt_resolve_test.go
Normal file
183
client/internal/dns/mgmt/mgmt_resolve_test.go
Normal file
@@ -0,0 +1,183 @@
|
||||
package mgmt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/miekg/dns"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
|
||||
"github.com/netbirdio/netbird/shared/management/domain"
|
||||
)
|
||||
|
||||
// A domain already in the cache must not be re-resolved on a subsequent server
|
||||
// domains update; it is left to the stale-while-revalidate refresh path.
|
||||
func TestResolver_UpdateFromServerDomains_SkipsCached(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("signal.example.com.", dns.TypeA, "10.0.0.2")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
sd := dnsconfig.ServerDomains{Signal: domain.Domain("signal.example.com")}
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"first update must resolve the domain")
|
||||
|
||||
_, err = r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"cached domain must not be re-resolved on a subsequent update")
|
||||
}
|
||||
|
||||
// New domains in a single update must resolve concurrently rather than serially.
|
||||
func TestResolver_AddNewDomains_ResolvesConcurrently(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
|
||||
var inflight, maxInflight atomic.Int32
|
||||
chain.onLookup = func() {
|
||||
n := inflight.Add(1)
|
||||
for {
|
||||
old := maxInflight.Load()
|
||||
if n <= old || maxInflight.CompareAndSwap(old, n) {
|
||||
break
|
||||
}
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
inflight.Add(-1)
|
||||
}
|
||||
|
||||
relays := []domain.Domain{"a.example.com", "b.example.com", "c.example.com", "d.example.com"}
|
||||
for _, d := range relays {
|
||||
chain.setAnswer(dns.Fqdn(string(d)), dns.TypeA, "10.0.0.2")
|
||||
}
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
start := time.Now()
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: relays})
|
||||
require.NoError(t, err)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
assert.GreaterOrEqual(t, int(maxInflight.Load()), 2, "domains must resolve concurrently")
|
||||
// Serial resolution of 4 domains would take at least 4*50ms; concurrent is far less.
|
||||
assert.Less(t, elapsed, 300*time.Millisecond, "resolution should not be serial")
|
||||
}
|
||||
|
||||
// A domain that fails to resolve must not be retried on every update; the
|
||||
// failure backoff suppresses re-resolution until it expires.
|
||||
func TestResolver_UpdateFromServerDomains_BacksOffFailures(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.err = errors.New("resolve boom")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
sd := dnsconfig.ServerDomains{Signal: domain.Domain("signal.example.com")}
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"first update must attempt the resolve")
|
||||
|
||||
_, err = r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"failed resolve must back off and not retry on the next update")
|
||||
}
|
||||
|
||||
// A domain listed under more than one server-domain type (e.g. STUN and TURN on
|
||||
// the same host) must be resolved once per update, not once per occurrence.
|
||||
func TestResolver_AddNewDomains_DedupesDuplicateDomains(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("dup.example.com.", dns.TypeA, "10.0.0.9")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
sd := dnsconfig.ServerDomains{
|
||||
Stuns: []domain.Domain{"dup.example.com"},
|
||||
Turns: []domain.Domain{"dup.example.com"},
|
||||
}
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, chain.callCount("dup.example.com.", dns.TypeA),
|
||||
"a domain appearing under multiple server-domain types must resolve once")
|
||||
}
|
||||
|
||||
// A failure marker must be dropped once its domain leaves the server-domains set
|
||||
// so the map stays bounded to the current set.
|
||||
func TestResolver_UpdateFromServerDomains_PrunesFailedResolves(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.err = errors.New("resolve boom")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Signal: domain.Domain("gone.example.com")})
|
||||
require.NoError(t, err)
|
||||
r.mutex.RLock()
|
||||
_, marked := r.failedResolves[domain.Domain("gone.example.com")]
|
||||
r.mutex.RUnlock()
|
||||
require.True(t, marked, "failed resolve must be recorded")
|
||||
|
||||
_, err = r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Signal: domain.Domain("other.example.com")})
|
||||
require.NoError(t, err)
|
||||
r.mutex.RLock()
|
||||
_, stillMarked := r.failedResolves[domain.Domain("gone.example.com")]
|
||||
r.mutex.RUnlock()
|
||||
assert.False(t, stillMarked, "failure marker for a domain no longer in the set must be pruned")
|
||||
}
|
||||
|
||||
// When one family hard-errors while the other resolves, the domain is cached
|
||||
// for the working family but recorded as incomplete so the failed family is
|
||||
// retried under backoff instead of being treated as fully resolved forever.
|
||||
func TestResolver_AddNewDomains_RetriesPartialFamilyFailure(t *testing.T) {
|
||||
d := domain.Domain("relay.example.com")
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("relay.example.com.", dns.TypeA, "10.0.0.2")
|
||||
chain.setErr("relay.example.com.", dns.TypeAAAA, errors.New("servfail"))
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: []domain.Domain{d}})
|
||||
require.NoError(t, err)
|
||||
|
||||
r.mutex.RLock()
|
||||
_, aCached := r.records[dns.Question{Name: "relay.example.com.", Qtype: dns.TypeA, Qclass: dns.ClassINET}]
|
||||
_, marked := r.failedResolves[d]
|
||||
r.mutex.RUnlock()
|
||||
require.True(t, aCached, "the working family must still be cached")
|
||||
require.True(t, marked, "a partial failure must be recorded so the failed family is retried")
|
||||
|
||||
assert.False(t, r.needsResolve(d), "within the backoff window the domain is not retried")
|
||||
|
||||
r.mutex.Lock()
|
||||
r.failedResolves[d] = time.Now().Add(-2 * refreshBackoff)
|
||||
r.mutex.Unlock()
|
||||
assert.True(t, r.needsResolve(d), "after the backoff elapses the domain is retried to pick up the missing family")
|
||||
}
|
||||
|
||||
// A family that returns NODATA (legitimately absent, e.g. an IPv4-only host) is
|
||||
// not a failure: the domain must not be marked for retry, otherwise it would be
|
||||
// re-resolved on every sync.
|
||||
func TestResolver_AddNewDomains_NodataIsNotFailure(t *testing.T) {
|
||||
d := domain.Domain("v4only.example.com")
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("v4only.example.com.", dns.TypeA, "10.0.0.2")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: []domain.Domain{d}})
|
||||
require.NoError(t, err)
|
||||
|
||||
r.mutex.RLock()
|
||||
_, marked := r.failedResolves[d]
|
||||
r.mutex.RUnlock()
|
||||
assert.False(t, marked, "a NODATA family must not be recorded as a failure")
|
||||
assert.False(t, r.needsResolve(d), "an IPv4-only host must not be re-resolved on later syncs")
|
||||
}
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
@@ -53,6 +54,7 @@ import (
|
||||
"github.com/netbirdio/netbird/client/internal/relay"
|
||||
"github.com/netbirdio/netbird/client/internal/rosenpass"
|
||||
"github.com/netbirdio/netbird/client/internal/routemanager"
|
||||
"github.com/netbirdio/netbird/client/internal/routemanager/systemops"
|
||||
"github.com/netbirdio/netbird/client/internal/statemanager"
|
||||
"github.com/netbirdio/netbird/client/internal/syncstore"
|
||||
"github.com/netbirdio/netbird/client/internal/updater"
|
||||
@@ -88,6 +90,13 @@ var ErrResetConnection = fmt.Errorf("reset connection")
|
||||
|
||||
var ErrEngineAlreadyStarted = errors.New("engine already started")
|
||||
|
||||
// engineRestartCount and engineLastRestart track client-restart cadence across
|
||||
// engine recreations so a restart loop is distinguishable from rare restarts.
|
||||
var (
|
||||
engineRestartCount atomic.Int64
|
||||
engineLastRestart atomic.Int64
|
||||
)
|
||||
|
||||
type EngineConfig struct {
|
||||
WgPort int
|
||||
WgIfaceName string
|
||||
@@ -899,7 +908,11 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
duration := time.Since(started)
|
||||
log.Infof("sync finished in %s", duration)
|
||||
if update.GetNetworkMap() != nil {
|
||||
log.Infof("sync finished in %s, %d", duration, update.GetNetworkMap().GetSerial())
|
||||
} else {
|
||||
log.Infof("sync finished in %s", duration)
|
||||
}
|
||||
e.clientMetrics.RecordSyncDuration(e.ctx, duration)
|
||||
}()
|
||||
e.syncMsgMux.Lock()
|
||||
@@ -909,14 +922,23 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
if e.ctx.Err() != nil {
|
||||
return e.ctx.Err()
|
||||
}
|
||||
serial := update.GetNetworkMap().GetSerial()
|
||||
if nm := update.GetNetworkMap(); nm != nil {
|
||||
log.Infof("sync update: serial=%d remotePeers=%d offlinePeers=%d routes=%d firewallRules=%d checks=%d configPresent=%v remotePeersEmpty=%v",
|
||||
nm.GetSerial(), len(nm.GetRemotePeers()), len(nm.GetOfflinePeers()), len(nm.GetRoutes()),
|
||||
len(nm.GetFirewallRules()), len(update.GetChecks()), update.GetNetbirdConfig() != nil, nm.GetRemotePeersIsEmpty())
|
||||
} else {
|
||||
log.Infof("sync update: config-only (no network map), configPresent=%v", update.GetNetbirdConfig() != nil)
|
||||
}
|
||||
|
||||
if update.NetworkMap != nil && update.NetworkMap.PeerConfig != nil {
|
||||
e.handleAutoUpdateVersion(update.NetworkMap.PeerConfig.AutoUpdate)
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
if err := e.updateNetbirdConfig(update.GetNetbirdConfig()); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("netbird config updated in %s, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
// Posture checks are bound to the network map presence:
|
||||
// NetworkMap != nil, checks present -> apply the received checks
|
||||
@@ -927,17 +949,21 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
if nm == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
startTime = time.Now()
|
||||
if err := e.updateChecksIfNew(update.Checks); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("checks updated in %s, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
startTime = time.Now()
|
||||
e.persistSyncResponse(update)
|
||||
|
||||
log.Infof("sync response persisted in %s, serial=%d", time.Since(startTime), serial)
|
||||
// only apply new changes and ignore old ones
|
||||
startTime = time.Now()
|
||||
if err := e.updateNetworkMap(nm); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("network map updated in %s, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
e.statusRecorder.PublishEvent(cProto.SystemEvent_INFO, cProto.SystemEvent_SYSTEM, "Network map updated", "", nil)
|
||||
|
||||
@@ -1357,44 +1383,56 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
|
||||
dnsConfig := toDNSConfig(protoDNSConfig, e.wgInterface.Address())
|
||||
|
||||
startTime := time.Now()
|
||||
if err := e.dnsServer.UpdateDNSServer(serial, dnsConfig); err != nil {
|
||||
log.Errorf("failed to update dns server, err: %v", err)
|
||||
}
|
||||
log.Infof("updated dns server in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
e.routeManager.SetDNSForwarderPort(dnsConfig.ForwarderPort)
|
||||
|
||||
// apply routes first, route related actions might depend on routing being enabled
|
||||
startTime = time.Now()
|
||||
routes := toRoutes(networkMap.GetRoutes())
|
||||
serverRoutes, clientRoutes := e.routeManager.ClassifyRoutes(routes)
|
||||
|
||||
log.Infof("updated routes in %v, serial=%d", time.Since(startTime), serial)
|
||||
// lazy mgr needs to be aware of which routes are available before they are applied
|
||||
if e.connMgr != nil {
|
||||
e.connMgr.UpdateRouteHAMap(clientRoutes)
|
||||
log.Debugf("updated lazy connection manager with %d HA groups", len(clientRoutes))
|
||||
}
|
||||
|
||||
startTime = time.Now()
|
||||
dnsRouteFeatureFlag := toDNSFeatureFlag(networkMap)
|
||||
if err := e.routeManager.UpdateRoutes(serial, serverRoutes, clientRoutes, dnsRouteFeatureFlag); err != nil {
|
||||
log.Errorf("failed to update routes: %v", err)
|
||||
}
|
||||
log.Infof("updated routes in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
startTime = time.Now()
|
||||
if e.acl != nil {
|
||||
e.acl.ApplyFiltering(networkMap, dnsRouteFeatureFlag)
|
||||
}
|
||||
log.Infof("updated filtering in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
startTime = time.Now()
|
||||
fwdEntries := toRouteDomains(e.config.WgPrivateKey.PublicKey().String(), routes)
|
||||
e.updateDNSForwarder(dnsRouteFeatureFlag, fwdEntries)
|
||||
log.Infof("updated DNS forwarder in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
startTime = time.Now()
|
||||
// Ingress forward rules
|
||||
forwardingRules, err := e.updateForwardRules(networkMap.GetForwardingRules())
|
||||
if err != nil {
|
||||
log.Errorf("failed to update forward rules, err: %v", err)
|
||||
}
|
||||
log.Infof("updated forward rules in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))
|
||||
|
||||
startTime = time.Now()
|
||||
e.updateOfflinePeers(networkMap.GetOfflinePeers())
|
||||
|
||||
log.Infof("updated offline peers in %v, serial=%d", time.Since(startTime), serial)
|
||||
// Filter out own peer from the remote peers list
|
||||
localPubKey := e.config.WgPrivateKey.PublicKey().String()
|
||||
remotePeers := make([]*mgmProto.RemotePeerConfig, 0, len(networkMap.GetRemotePeers()))
|
||||
@@ -1412,20 +1450,24 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
startTime = time.Now()
|
||||
err := e.removePeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("removed peers in %v, serial=%d", time.Since(startTime), serial)
|
||||
startTime = time.Now()
|
||||
err = e.modifyPeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("modified peers in %v, serial=%d", time.Since(startTime), serial)
|
||||
startTime = time.Now()
|
||||
err = e.addNewPeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("added peers in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
e.statusRecorder.FinishPeerListModifications()
|
||||
|
||||
@@ -1438,9 +1480,11 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
e.updateSSHServerAuth(networkMap.GetSshAuth())
|
||||
}
|
||||
|
||||
startTime = time.Now()
|
||||
// must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store
|
||||
excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, remotePeers)
|
||||
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
|
||||
log.Infof("updated lazy connection manager exclude list in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
e.networkSerial = serial
|
||||
|
||||
@@ -1955,6 +1999,7 @@ func (e *Engine) newWgIface() (*iface.WGIface, error) {
|
||||
WGPrivKey: e.config.WgPrivateKey.String(),
|
||||
MTU: e.config.MTU,
|
||||
TransportNet: transportNet,
|
||||
FilterFn: e.addrViaRoutes,
|
||||
DisableDNS: e.config.DisableDNS,
|
||||
}
|
||||
|
||||
@@ -2171,7 +2216,14 @@ func (e *Engine) triggerClientRestart() {
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("restarting engine")
|
||||
// Cadence survives engine recreation (package-level), so a restart loop shows
|
||||
// as a fast-climbing count with a short gap, distinct from rare intentional restarts.
|
||||
n := engineRestartCount.Add(1)
|
||||
var sinceLast time.Duration
|
||||
if prev := engineLastRestart.Swap(time.Now().UnixNano()); prev != 0 {
|
||||
sinceLast = time.Since(time.Unix(0, prev))
|
||||
}
|
||||
log.Infof("restarting engine (restart #%d, %s since previous)", n, sinceLast.Round(time.Second))
|
||||
CtxGetState(e.ctx).Set(StatusConnecting)
|
||||
_ = CtxGetState(e.ctx).Wrap(ErrResetConnection)
|
||||
log.Infof("cancelling client context, engine will be recreated")
|
||||
@@ -2202,6 +2254,21 @@ func (e *Engine) startNetworkMonitor() {
|
||||
}()
|
||||
}
|
||||
|
||||
func (e *Engine) addrViaRoutes(addr netip.Addr) (bool, netip.Prefix, error) {
|
||||
var vpnRoutes []netip.Prefix
|
||||
for _, routes := range e.routeManager.GetClientRoutes() {
|
||||
if len(routes) > 0 && routes[0] != nil {
|
||||
vpnRoutes = append(vpnRoutes, routes[0].Network)
|
||||
}
|
||||
}
|
||||
|
||||
if isVpn, prefix := systemops.IsAddrRouted(addr, vpnRoutes); isVpn {
|
||||
return true, prefix, nil
|
||||
}
|
||||
|
||||
return false, netip.Prefix{}, nil
|
||||
}
|
||||
|
||||
func (e *Engine) stopDNSServer() {
|
||||
if e.dnsServer == nil {
|
||||
return
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"runtime/debug"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -192,6 +193,7 @@ func (s *StatusChangeSubscription) Events() chan map[string]RouterState {
|
||||
// Pure read methods take RLock; anything that mutates state takes Lock.
|
||||
type Status struct {
|
||||
mux sync.RWMutex
|
||||
muxRelays sync.RWMutex
|
||||
peers map[string]State
|
||||
ipToKey map[string]string
|
||||
changeNotify map[string]map[string]*StatusChangeSubscription // map[peerID]map[subscriptionID]*StatusChangeSubscription
|
||||
@@ -226,6 +228,8 @@ type Status struct {
|
||||
|
||||
routeIDLookup routeIDLookup
|
||||
wgIface WGIfaceStatus
|
||||
|
||||
profile *StatusProfile
|
||||
}
|
||||
|
||||
// NewRecorder returns a new Status instance
|
||||
@@ -240,16 +244,23 @@ func NewRecorder(mgmAddress string) *Status {
|
||||
notifier: newNotifier(),
|
||||
mgmAddress: mgmAddress,
|
||||
resolvedDomainsStates: map[domain.Domain]ResolvedDomainInfo{},
|
||||
profile: NewStatusProfile(context.Background()),
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Status) StartProfile(ctx context.Context) {
|
||||
d.profile.Start(ctx)
|
||||
}
|
||||
|
||||
func (d *Status) SetRelayMgr(manager *relayClient.Manager) {
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.profile.inc("SetRelayMgr")
|
||||
d.muxRelays.Lock()
|
||||
defer d.muxRelays.Unlock()
|
||||
d.relayMgr = manager
|
||||
}
|
||||
|
||||
func (d *Status) SetIngressGwMgr(ingressGwMgr *ingressgw.Manager) {
|
||||
d.profile.inc("SetIngressGwMgr")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.ingressGwMgr = ingressGwMgr
|
||||
@@ -257,6 +268,7 @@ func (d *Status) SetIngressGwMgr(ingressGwMgr *ingressgw.Manager) {
|
||||
|
||||
// ReplaceOfflinePeers replaces
|
||||
func (d *Status) ReplaceOfflinePeers(replacement []State) {
|
||||
d.profile.inc("ReplaceOfflinePeers")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.offlinePeers = make([]State, len(replacement))
|
||||
@@ -268,6 +280,7 @@ func (d *Status) ReplaceOfflinePeers(replacement []State) {
|
||||
|
||||
// AddPeer adds peer to Daemon status map
|
||||
func (d *Status) AddPeer(peerPubKey string, fqdn string, ip string, ipv6 string) error {
|
||||
d.profile.inc("AddPeer")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -295,6 +308,7 @@ func (d *Status) AddPeer(peerPubKey string, fqdn string, ip string, ipv6 string)
|
||||
|
||||
// GetPeer adds peer to Daemon status map
|
||||
func (d *Status) GetPeer(peerPubKey string) (State, error) {
|
||||
d.profile.inc("GetPeer")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
|
||||
@@ -306,6 +320,7 @@ func (d *Status) GetPeer(peerPubKey string) (State, error) {
|
||||
}
|
||||
|
||||
func (d *Status) PeerByIP(ip string) (string, bool) {
|
||||
d.profile.inc("PeerByIP")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
|
||||
@@ -323,6 +338,7 @@ func (d *Status) PeerByIP(ip string) (string, bool) {
|
||||
// active peers are matched; peers moved into the offline slice by
|
||||
// ReplaceOfflinePeers are intentionally treated as unknown.
|
||||
func (d *Status) PeerStateByIP(ip string) (State, bool) {
|
||||
d.profile.inc("PeerStateByIP")
|
||||
if ip == "" {
|
||||
return State{}, false
|
||||
}
|
||||
@@ -341,6 +357,7 @@ func (d *Status) PeerStateByIP(ip string) (State, bool) {
|
||||
|
||||
// RemovePeer removes peer from Daemon status map
|
||||
func (d *Status) RemovePeer(peerPubKey string) error {
|
||||
d.profile.inc("RemovePeer")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -362,6 +379,7 @@ func (d *Status) RemovePeer(peerPubKey string) error {
|
||||
|
||||
// UpdatePeerState updates peer status
|
||||
func (d *Status) UpdatePeerState(receivedState State) error {
|
||||
d.profile.inc("UpdatePeerState")
|
||||
d.mux.Lock()
|
||||
|
||||
peerState, ok := d.peers[receivedState.PubKey]
|
||||
@@ -404,6 +422,7 @@ func (d *Status) UpdatePeerState(receivedState State) error {
|
||||
}
|
||||
|
||||
func (d *Status) AddPeerStateRoute(peer string, route string, resourceId route.ResID) error {
|
||||
d.profile.inc("AddPeerStateRoute")
|
||||
d.mux.Lock()
|
||||
|
||||
peerState, ok := d.peers[peer]
|
||||
@@ -429,6 +448,7 @@ func (d *Status) AddPeerStateRoute(peer string, route string, resourceId route.R
|
||||
}
|
||||
|
||||
func (d *Status) RemovePeerStateRoute(peer string, route string) error {
|
||||
d.profile.inc("RemovePeerStateRoute")
|
||||
d.mux.Lock()
|
||||
|
||||
peerState, ok := d.peers[peer]
|
||||
@@ -459,11 +479,13 @@ func (d *Status) CheckRoutes(ip netip.Addr) ([]byte, bool) {
|
||||
if d == nil {
|
||||
return nil, false
|
||||
}
|
||||
d.profile.inc("CheckRoutes")
|
||||
resId, isExitNode := d.routeIDLookup.Lookup(ip)
|
||||
return []byte(resId), isExitNode
|
||||
}
|
||||
|
||||
func (d *Status) UpdatePeerICEState(receivedState State) error {
|
||||
d.profile.inc("UpdatePeerICEState")
|
||||
d.mux.Lock()
|
||||
|
||||
peerState, ok := d.peers[receivedState.PubKey]
|
||||
@@ -503,6 +525,7 @@ func (d *Status) UpdatePeerICEState(receivedState State) error {
|
||||
}
|
||||
|
||||
func (d *Status) UpdatePeerRelayedState(receivedState State) error {
|
||||
d.profile.inc("UpdatePeerRelayedState")
|
||||
d.mux.Lock()
|
||||
|
||||
peerState, ok := d.peers[receivedState.PubKey]
|
||||
@@ -539,6 +562,7 @@ func (d *Status) UpdatePeerRelayedState(receivedState State) error {
|
||||
}
|
||||
|
||||
func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error {
|
||||
d.profile.inc("UpdatePeerRelayedStateToDisconnected")
|
||||
d.mux.Lock()
|
||||
|
||||
peerState, ok := d.peers[receivedState.PubKey]
|
||||
@@ -574,6 +598,7 @@ func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error
|
||||
}
|
||||
|
||||
func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error {
|
||||
d.profile.inc("UpdatePeerICEStateToDisconnected")
|
||||
d.mux.Lock()
|
||||
|
||||
peerState, ok := d.peers[receivedState.PubKey]
|
||||
@@ -613,6 +638,7 @@ func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error {
|
||||
|
||||
// UpdateWireGuardPeerState updates the WireGuard bits of the peer state
|
||||
func (d *Status) UpdateWireGuardPeerState(pubKey string, wgStats configurer.WGStats) error {
|
||||
d.profile.inc("UpdateWireGuardPeerState")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -640,6 +666,7 @@ func hasConnStatusChanged(oldStatus, newStatus ConnStatus) bool {
|
||||
|
||||
// UpdatePeerFQDN update peer's state fqdn only
|
||||
func (d *Status) UpdatePeerFQDN(peerPubKey, fqdn string) error {
|
||||
d.profile.inc("UpdatePeerFQDN")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -656,6 +683,7 @@ func (d *Status) UpdatePeerFQDN(peerPubKey, fqdn string) error {
|
||||
|
||||
// UpdatePeerSSHHostKey updates peer's SSH host key
|
||||
func (d *Status) UpdatePeerSSHHostKey(peerPubKey string, sshHostKey []byte) error {
|
||||
d.profile.inc("UpdatePeerSSHHostKey")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -672,6 +700,7 @@ func (d *Status) UpdatePeerSSHHostKey(peerPubKey string, sshHostKey []byte) erro
|
||||
|
||||
// FinishPeerListModifications this event invoke the notification
|
||||
func (d *Status) FinishPeerListModifications() {
|
||||
d.profile.inc("FinishPeerListModifications")
|
||||
d.mux.Lock()
|
||||
|
||||
if !d.peerListChangedForNotification {
|
||||
@@ -704,6 +733,7 @@ func (d *Status) FinishPeerListModifications() {
|
||||
}
|
||||
|
||||
func (d *Status) SubscribeToPeerStateChanges(ctx context.Context, peerID string) *StatusChangeSubscription {
|
||||
d.profile.inc("SubscribeToPeerStateChanges")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -717,6 +747,7 @@ func (d *Status) SubscribeToPeerStateChanges(ctx context.Context, peerID string)
|
||||
}
|
||||
|
||||
func (d *Status) UnsubscribePeerStateChanges(subscription *StatusChangeSubscription) {
|
||||
d.profile.inc("UnsubscribePeerStateChanges")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -742,6 +773,7 @@ func (d *Status) UnsubscribePeerStateChanges(subscription *StatusChangeSubscript
|
||||
|
||||
// GetLocalPeerState returns the local peer state
|
||||
func (d *Status) GetLocalPeerState() LocalPeerState {
|
||||
d.profile.inc("GetLocalPeerState")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
return d.localPeer.Clone()
|
||||
@@ -749,6 +781,7 @@ func (d *Status) GetLocalPeerState() LocalPeerState {
|
||||
|
||||
// UpdateLocalPeerState updates local peer status
|
||||
func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) {
|
||||
d.profile.inc("UpdateLocalPeerState")
|
||||
d.mux.Lock()
|
||||
d.localPeer = localPeerState
|
||||
fqdn := d.localPeer.FQDN
|
||||
@@ -763,6 +796,7 @@ func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) {
|
||||
|
||||
// AddLocalPeerStateRoute adds a route to the local peer state
|
||||
func (d *Status) AddLocalPeerStateRoute(route string, resourceId route.ResID) {
|
||||
d.profile.inc("AddLocalPeerStateRoute")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -780,6 +814,7 @@ func (d *Status) AddLocalPeerStateRoute(route string, resourceId route.ResID) {
|
||||
|
||||
// RemoveLocalPeerStateRoute removes a route from the local peer state
|
||||
func (d *Status) RemoveLocalPeerStateRoute(route string) {
|
||||
d.profile.inc("RemoveLocalPeerStateRoute")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -793,6 +828,7 @@ func (d *Status) RemoveLocalPeerStateRoute(route string) {
|
||||
|
||||
// AddResolvedIPLookupEntry adds a resolved IP lookup entry
|
||||
func (d *Status) AddResolvedIPLookupEntry(prefix netip.Prefix, resourceId route.ResID) {
|
||||
d.profile.inc("AddResolvedIPLookupEntry")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -801,6 +837,7 @@ func (d *Status) AddResolvedIPLookupEntry(prefix netip.Prefix, resourceId route.
|
||||
|
||||
// RemoveResolvedIPLookupEntry removes a resolved IP lookup entry
|
||||
func (d *Status) RemoveResolvedIPLookupEntry(route string) {
|
||||
d.profile.inc("RemoveResolvedIPLookupEntry")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -812,6 +849,7 @@ func (d *Status) RemoveResolvedIPLookupEntry(route string) {
|
||||
|
||||
// CleanLocalPeerStateRoutes cleans all routes from the local peer state
|
||||
func (d *Status) CleanLocalPeerStateRoutes() {
|
||||
d.profile.inc("CleanLocalPeerStateRoutes")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -820,6 +858,7 @@ func (d *Status) CleanLocalPeerStateRoutes() {
|
||||
|
||||
// CleanLocalPeerState cleans local peer status
|
||||
func (d *Status) CleanLocalPeerState() {
|
||||
d.profile.inc("CleanLocalPeerState")
|
||||
d.mux.Lock()
|
||||
d.localPeer = LocalPeerState{}
|
||||
fqdn := d.localPeer.FQDN
|
||||
@@ -831,6 +870,7 @@ func (d *Status) CleanLocalPeerState() {
|
||||
|
||||
// MarkManagementDisconnected sets ManagementState to disconnected
|
||||
func (d *Status) MarkManagementDisconnected(err error) {
|
||||
d.profile.inc("MarkManagementDisconnected")
|
||||
d.mux.Lock()
|
||||
d.managementState = false
|
||||
d.managementError = err
|
||||
@@ -843,6 +883,7 @@ func (d *Status) MarkManagementDisconnected(err error) {
|
||||
|
||||
// MarkManagementConnected sets ManagementState to connected
|
||||
func (d *Status) MarkManagementConnected() {
|
||||
d.profile.inc("MarkManagementConnected")
|
||||
d.mux.Lock()
|
||||
d.managementState = true
|
||||
d.managementError = nil
|
||||
@@ -855,6 +896,7 @@ func (d *Status) MarkManagementConnected() {
|
||||
|
||||
// UpdateSignalAddress update the address of the signal server
|
||||
func (d *Status) UpdateSignalAddress(signalURL string) {
|
||||
d.profile.inc("UpdateSignalAddress")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.signalAddress = signalURL
|
||||
@@ -862,6 +904,7 @@ func (d *Status) UpdateSignalAddress(signalURL string) {
|
||||
|
||||
// UpdateManagementAddress update the address of the management server
|
||||
func (d *Status) UpdateManagementAddress(mgmAddress string) {
|
||||
d.profile.inc("UpdateManagementAddress")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.mgmAddress = mgmAddress
|
||||
@@ -869,6 +912,7 @@ func (d *Status) UpdateManagementAddress(mgmAddress string) {
|
||||
|
||||
// UpdateRosenpass update the Rosenpass configuration
|
||||
func (d *Status) UpdateRosenpass(rosenpassEnabled, rosenpassPermissive bool) {
|
||||
d.profile.inc("UpdateRosenpass")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.rosenpassPermissive = rosenpassPermissive
|
||||
@@ -876,6 +920,7 @@ func (d *Status) UpdateRosenpass(rosenpassEnabled, rosenpassPermissive bool) {
|
||||
}
|
||||
|
||||
func (d *Status) UpdateLazyConnection(enabled bool) {
|
||||
d.profile.inc("UpdateLazyConnection")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.lazyConnectionEnabled = enabled
|
||||
@@ -883,6 +928,7 @@ func (d *Status) UpdateLazyConnection(enabled bool) {
|
||||
|
||||
// MarkSignalDisconnected sets SignalState to disconnected
|
||||
func (d *Status) MarkSignalDisconnected(err error) {
|
||||
d.profile.inc("MarkSignalDisconnected")
|
||||
d.mux.Lock()
|
||||
d.signalState = false
|
||||
d.signalError = err
|
||||
@@ -895,6 +941,7 @@ func (d *Status) MarkSignalDisconnected(err error) {
|
||||
|
||||
// MarkSignalConnected sets SignalState to connected
|
||||
func (d *Status) MarkSignalConnected() {
|
||||
d.profile.inc("MarkSignalConnected")
|
||||
d.mux.Lock()
|
||||
d.signalState = true
|
||||
d.signalError = nil
|
||||
@@ -906,18 +953,21 @@ func (d *Status) MarkSignalConnected() {
|
||||
}
|
||||
|
||||
func (d *Status) UpdateRelayStates(relayResults []relay.ProbeResult) {
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.profile.inc("UpdateRelayStates")
|
||||
d.muxRelays.Lock()
|
||||
defer d.muxRelays.Unlock()
|
||||
d.relayStates = relayResults
|
||||
}
|
||||
|
||||
func (d *Status) UpdateDNSStates(dnsStates []NSGroupState) {
|
||||
d.profile.inc("UpdateDNSStates")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.nsGroupStates = dnsStates
|
||||
}
|
||||
|
||||
func (d *Status) UpdateResolvedDomainsStates(originalDomain domain.Domain, resolvedDomain domain.Domain, prefixes []netip.Prefix, resourceId route.ResID) {
|
||||
d.profile.inc("UpdateResolvedDomainsStates")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -933,6 +983,7 @@ func (d *Status) UpdateResolvedDomainsStates(originalDomain domain.Domain, resol
|
||||
}
|
||||
|
||||
func (d *Status) DeleteResolvedDomainsStates(domain domain.Domain) {
|
||||
d.profile.inc("DeleteResolvedDomainsStates")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -949,6 +1000,7 @@ func (d *Status) DeleteResolvedDomainsStates(domain domain.Domain) {
|
||||
}
|
||||
|
||||
func (d *Status) GetRosenpassState() RosenpassState {
|
||||
d.profile.inc("GetRosenpassState")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
return RosenpassState{
|
||||
@@ -958,12 +1010,14 @@ func (d *Status) GetRosenpassState() RosenpassState {
|
||||
}
|
||||
|
||||
func (d *Status) GetLazyConnection() bool {
|
||||
d.profile.inc("GetLazyConnection")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
return d.lazyConnectionEnabled
|
||||
}
|
||||
|
||||
func (d *Status) GetManagementState() ManagementState {
|
||||
d.profile.inc("GetManagementState")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
return ManagementState{
|
||||
@@ -974,6 +1028,7 @@ func (d *Status) GetManagementState() ManagementState {
|
||||
}
|
||||
|
||||
func (d *Status) UpdateLatency(pubKey string, latency time.Duration) error {
|
||||
d.profile.inc("UpdateLatency")
|
||||
if latency <= 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -991,6 +1046,7 @@ func (d *Status) UpdateLatency(pubKey string, latency time.Duration) error {
|
||||
|
||||
// IsLoginRequired determines if a peer's login has expired.
|
||||
func (d *Status) IsLoginRequired() bool {
|
||||
d.profile.inc("IsLoginRequired")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
|
||||
@@ -1007,6 +1063,7 @@ func (d *Status) IsLoginRequired() bool {
|
||||
}
|
||||
|
||||
func (d *Status) GetSignalState() SignalState {
|
||||
d.profile.inc("GetSignalState")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
return SignalState{
|
||||
@@ -1018,24 +1075,34 @@ func (d *Status) GetSignalState() SignalState {
|
||||
|
||||
// GetRelayStates returns the stun/turn/permanent relay states
|
||||
func (d *Status) GetRelayStates() []relay.ProbeResult {
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
d.profile.inc("GetRelayStates")
|
||||
d.muxRelays.RLock()
|
||||
|
||||
// debug lines
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
debugElapsed("GetRelayStates", started)
|
||||
}()
|
||||
|
||||
if d.relayMgr == nil {
|
||||
defer d.muxRelays.RUnlock()
|
||||
return d.relayStates
|
||||
}
|
||||
|
||||
relayMgr := d.relayMgr
|
||||
// extend the list of stun, turn servers with the relay server connections
|
||||
relayStates := slices.Clone(d.relayStates)
|
||||
d.muxRelays.RUnlock()
|
||||
|
||||
states := d.relayMgr.RelayStates()
|
||||
states := relayMgr.RelayStates()
|
||||
if len(states) == 0 {
|
||||
// no relay connection tracked yet; surface configured servers as
|
||||
// unavailable with the real reconnect error when known
|
||||
err := relayClient.ErrRelayClientNotConnected
|
||||
if connErr := d.relayMgr.RelayConnectError(); connErr != nil {
|
||||
if connErr := relayMgr.RelayConnectError(); connErr != nil {
|
||||
err = connErr
|
||||
}
|
||||
for _, r := range d.relayMgr.ServerURLs() {
|
||||
for _, r := range relayMgr.ServerURLs() {
|
||||
relayStates = append(relayStates, relay.ProbeResult{
|
||||
URI: r,
|
||||
Err: err,
|
||||
@@ -1055,7 +1122,15 @@ func (d *Status) GetRelayStates() []relay.ProbeResult {
|
||||
}
|
||||
|
||||
func (d *Status) ForwardingRules() []firewall.ForwardRule {
|
||||
d.profile.inc("ForwardingRules")
|
||||
d.mux.RLock()
|
||||
|
||||
// debug lines
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
debugElapsed("ForwardingRules", started)
|
||||
}()
|
||||
|
||||
defer d.mux.RUnlock()
|
||||
if d.ingressGwMgr == nil {
|
||||
return nil
|
||||
@@ -1065,6 +1140,7 @@ func (d *Status) ForwardingRules() []firewall.ForwardRule {
|
||||
}
|
||||
|
||||
func (d *Status) GetDNSStates() []NSGroupState {
|
||||
d.profile.inc("GetDNSStates")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
|
||||
@@ -1073,6 +1149,7 @@ func (d *Status) GetDNSStates() []NSGroupState {
|
||||
}
|
||||
|
||||
func (d *Status) GetResolvedDomainsStates() map[domain.Domain]ResolvedDomainInfo {
|
||||
d.profile.inc("GetResolvedDomainsStates")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
return maps.Clone(d.resolvedDomainsStates)
|
||||
@@ -1080,6 +1157,7 @@ func (d *Status) GetResolvedDomainsStates() map[domain.Domain]ResolvedDomainInfo
|
||||
|
||||
// GetFullStatus gets full status
|
||||
func (d *Status) GetFullStatus() FullStatus {
|
||||
d.profile.inc("GetFullStatus")
|
||||
fullStatus := FullStatus{
|
||||
ManagementState: d.GetManagementState(),
|
||||
SignalState: d.GetSignalState(),
|
||||
@@ -1106,26 +1184,31 @@ func (d *Status) GetFullStatus() FullStatus {
|
||||
|
||||
// ClientStart will notify all listeners about the new service state
|
||||
func (d *Status) ClientStart() {
|
||||
d.profile.inc("ClientStart")
|
||||
d.notifier.clientStart()
|
||||
}
|
||||
|
||||
// ClientStop will notify all listeners about the new service state
|
||||
func (d *Status) ClientStop() {
|
||||
d.profile.inc("ClientStop")
|
||||
d.notifier.clientStop()
|
||||
}
|
||||
|
||||
// ClientTeardown will notify all listeners about the service is under teardown
|
||||
func (d *Status) ClientTeardown() {
|
||||
d.profile.inc("ClientTeardown")
|
||||
d.notifier.clientTearDown()
|
||||
}
|
||||
|
||||
// SetConnectionListener set a listener to the notifier
|
||||
func (d *Status) SetConnectionListener(listener Listener) {
|
||||
d.profile.inc("SetConnectionListener")
|
||||
d.notifier.setListener(listener)
|
||||
}
|
||||
|
||||
// RemoveConnectionListener remove the listener from the notifier
|
||||
func (d *Status) RemoveConnectionListener() {
|
||||
d.profile.inc("RemoveConnectionListener")
|
||||
d.notifier.removeListener()
|
||||
}
|
||||
|
||||
@@ -1197,6 +1280,7 @@ func (d *Status) PublishEvent(
|
||||
userMsg string,
|
||||
metadata map[string]string,
|
||||
) {
|
||||
d.profile.inc("PublishEvent")
|
||||
event := &proto.SystemEvent{
|
||||
Id: uuid.New().String(),
|
||||
Severity: severity,
|
||||
@@ -1208,6 +1292,13 @@ func (d *Status) PublishEvent(
|
||||
}
|
||||
|
||||
d.eventMux.Lock()
|
||||
|
||||
// debug lines
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
debugElapsed("PublishEvent", started)
|
||||
}()
|
||||
|
||||
defer d.eventMux.Unlock()
|
||||
|
||||
d.eventQueue.Add(event)
|
||||
@@ -1225,6 +1316,7 @@ func (d *Status) PublishEvent(
|
||||
|
||||
// SubscribeToEvents returns a new event subscription
|
||||
func (d *Status) SubscribeToEvents() *EventSubscription {
|
||||
d.profile.inc("SubscribeToEvents")
|
||||
d.eventMux.Lock()
|
||||
defer d.eventMux.Unlock()
|
||||
|
||||
@@ -1240,6 +1332,7 @@ func (d *Status) SubscribeToEvents() *EventSubscription {
|
||||
|
||||
// UnsubscribeFromEvents removes an event subscription
|
||||
func (d *Status) UnsubscribeFromEvents(sub *EventSubscription) {
|
||||
d.profile.inc("UnsubscribeFromEvents")
|
||||
if sub == nil {
|
||||
return
|
||||
}
|
||||
@@ -1255,10 +1348,12 @@ func (d *Status) UnsubscribeFromEvents(sub *EventSubscription) {
|
||||
|
||||
// GetEventHistory returns all events in the queue
|
||||
func (d *Status) GetEventHistory() []*proto.SystemEvent {
|
||||
d.profile.inc("GetEventHistory")
|
||||
return d.eventQueue.GetAll()
|
||||
}
|
||||
|
||||
func (d *Status) SetWgIface(wgInterface WGIfaceStatus) {
|
||||
d.profile.inc("SetWgIface")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -1266,7 +1361,15 @@ func (d *Status) SetWgIface(wgInterface WGIfaceStatus) {
|
||||
}
|
||||
|
||||
func (d *Status) PeersStatus() (*configurer.Stats, error) {
|
||||
d.profile.inc("PeersStatus")
|
||||
d.mux.RLock()
|
||||
|
||||
// debug lines
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
debugElapsed("PeersStatus", started)
|
||||
}()
|
||||
|
||||
defer d.mux.RUnlock()
|
||||
if d.wgIface == nil {
|
||||
return nil, fmt.Errorf("wgInterface is nil, cannot retrieve peers status")
|
||||
@@ -1279,7 +1382,15 @@ func (d *Status) PeersStatus() (*configurer.Stats, error) {
|
||||
// and updates the cached peer states. This ensures accurate handshake times and
|
||||
// transfer statistics in status reports without running full health probes.
|
||||
func (d *Status) RefreshWireGuardStats() error {
|
||||
d.profile.inc("RefreshWireGuardStats")
|
||||
d.mux.Lock()
|
||||
|
||||
// debug lines
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
debugElapsed("RefreshWireGuardStats", started)
|
||||
}()
|
||||
|
||||
defer d.mux.Unlock()
|
||||
|
||||
if d.wgIface == nil {
|
||||
@@ -1444,3 +1555,10 @@ func (fs FullStatus) ToProto() *proto.FullStatus {
|
||||
|
||||
return &pbFullStatus
|
||||
}
|
||||
|
||||
func debugElapsed(msg string, startTime time.Time) {
|
||||
if elapsed := time.Since(startTime); elapsed > 1*time.Second {
|
||||
log.Infof("run %s took %s", msg, elapsed)
|
||||
debug.PrintStack()
|
||||
}
|
||||
}
|
||||
|
||||
97
client/internal/peer/status_profile.go
Normal file
97
client/internal/peer/status_profile.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type StatusProfile struct {
|
||||
counts sync.Map
|
||||
}
|
||||
|
||||
func NewStatusProfile(ctx context.Context) *StatusProfile {
|
||||
s := &StatusProfile{}
|
||||
go s.Start(ctx)
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *StatusProfile) Start(ctx context.Context) {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.logCounts()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StatusProfile) inc(method string) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
if v, ok := s.counts.Load(method); ok {
|
||||
v.(*atomic.Int64).Add(1)
|
||||
return
|
||||
}
|
||||
cnt := &atomic.Int64{}
|
||||
actual, _ := s.counts.LoadOrStore(method, cnt)
|
||||
actual.(*atomic.Int64).Add(1)
|
||||
}
|
||||
|
||||
func (s *StatusProfile) snapshot() map[string]int64 {
|
||||
out := make(map[string]int64)
|
||||
s.counts.Range(func(k, v any) bool {
|
||||
out[k.(string)] = v.(*atomic.Int64).Load()
|
||||
return true
|
||||
})
|
||||
return out
|
||||
}
|
||||
|
||||
func (s *StatusProfile) logCounts() {
|
||||
counts := s.snapshot()
|
||||
if len(counts) == 0 {
|
||||
log.Infof("status profile: no Status method calls so far")
|
||||
return
|
||||
}
|
||||
|
||||
type kv struct {
|
||||
method string
|
||||
count int64
|
||||
}
|
||||
sorted := make([]kv, 0, len(counts))
|
||||
var total int64
|
||||
for m, c := range counts {
|
||||
if c == 0 {
|
||||
continue
|
||||
}
|
||||
sorted = append(sorted, kv{m, c})
|
||||
total += c
|
||||
}
|
||||
sort.Slice(sorted, func(i, j int) bool {
|
||||
if sorted[i].count != sorted[j].count {
|
||||
return sorted[i].count > sorted[j].count
|
||||
}
|
||||
return sorted[i].method < sorted[j].method
|
||||
})
|
||||
|
||||
var b strings.Builder
|
||||
for i, e := range sorted {
|
||||
if i > 0 {
|
||||
b.WriteString(", ")
|
||||
}
|
||||
b.WriteString(e.method)
|
||||
b.WriteByte('=')
|
||||
b.WriteString(strconv.FormatInt(e.count, 10))
|
||||
}
|
||||
log.Infof("status profile (cumulative total=%d): %s", total, b.String())
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -164,6 +165,10 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA
|
||||
return
|
||||
}
|
||||
|
||||
if candidateViaRoutes(candidate, haRoutes) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := w.agent.AddRemoteCandidate(candidate); err != nil {
|
||||
w.log.Errorf("error while handling remote candidate")
|
||||
return
|
||||
@@ -461,7 +466,7 @@ func (w *WorkerICE) createForwardedCandidate(srflxCandidate ice.Candidate, mappi
|
||||
}
|
||||
|
||||
func (w *WorkerICE) onICESelectedCandidatePair(agent *icemaker.ThreadSafeAgent, c1, c2 ice.Candidate) {
|
||||
w.log.Debugf("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
|
||||
w.log.Infof("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
|
||||
w.config.Key)
|
||||
|
||||
pairStat, ok := agent.GetSelectedCandidatePairStats()
|
||||
@@ -584,6 +589,34 @@ func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive
|
||||
return ec, nil
|
||||
}
|
||||
|
||||
func candidateViaRoutes(candidate ice.Candidate, clientRoutes route.HAMap) bool {
|
||||
addr, err := netip.ParseAddr(candidate.Address())
|
||||
if err != nil {
|
||||
log.Errorf("Failed to parse IP address %s: %v", candidate.Address(), err)
|
||||
return false
|
||||
}
|
||||
|
||||
var routePrefixes []netip.Prefix
|
||||
for _, routes := range clientRoutes {
|
||||
if len(routes) > 0 && routes[0] != nil {
|
||||
routePrefixes = append(routePrefixes, routes[0].Network)
|
||||
}
|
||||
}
|
||||
|
||||
for _, prefix := range routePrefixes {
|
||||
// default route is handled by route exclusion / ip rules
|
||||
if prefix.Bits() == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if prefix.Contains(addr) {
|
||||
log.Debugf("Ignoring candidate [%s], its address is part of routed network %s", candidate.String(), prefix)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isRelayCandidate(candidate ice.Candidate) bool {
|
||||
return candidate.Type() == ice.CandidateTypeRelay
|
||||
}
|
||||
|
||||
@@ -121,12 +121,9 @@ func (r *SysOps) addRouteToNonVPNIntf(prefix netip.Prefix, vpnIntf wgIface, init
|
||||
return Nexthop{}, vars.ErrRouteNotAllowed
|
||||
}
|
||||
|
||||
// BSDs blackhole a /32 added inside a directly-connected subnet; Linux/Windows need it to beat the wt0 route.
|
||||
switch runtime.GOOS {
|
||||
case "darwin", "freebsd", "netbsd", "openbsd", "dragonfly":
|
||||
if isLocal, subnet := r.isPrefixInLocalSubnets(prefix); isLocal {
|
||||
return Nexthop{}, fmt.Errorf("prefix %s is part of local subnet %s: %w", prefix, subnet, vars.ErrRouteNotAllowed)
|
||||
}
|
||||
// Check if the prefix is part of any local subnets
|
||||
if isLocal, subnet := r.isPrefixInLocalSubnets(prefix); isLocal {
|
||||
return Nexthop{}, fmt.Errorf("prefix %s is part of local subnet %s: %w", prefix, subnet, vars.ErrRouteNotAllowed)
|
||||
}
|
||||
|
||||
// Determine the exit interface and next hop for the prefix, so we can add a specific route
|
||||
|
||||
@@ -85,9 +85,7 @@ type Client struct {
|
||||
|
||||
stateMu sync.RWMutex
|
||||
connectClient *internal.ConnectClient
|
||||
// config holds the active configuration once Run has loaded it. Consumed by
|
||||
// the in-app SSH client for the NetBird SSH key and the OAuth flow.
|
||||
config *profilemanager.Config
|
||||
config *profilemanager.Config
|
||||
}
|
||||
|
||||
// NewClient instantiate a new Client
|
||||
@@ -172,7 +170,6 @@ func (c *Client) Run(fd int32, interfaceName string, envList *EnvList) error {
|
||||
ctx = internal.CtxInitState(ctx)
|
||||
c.onHostDnsFn = func([]string) {}
|
||||
cfg.WgIface = interfaceName
|
||||
c.config = cfg
|
||||
|
||||
connectClient := internal.NewConnectClient(ctx, cfg, c.recorder)
|
||||
c.setState(cfg, connectClient)
|
||||
@@ -701,13 +698,6 @@ func (c *Client) stateSnapshot() (*profilemanager.Config, *internal.ConnectClien
|
||||
return c.config, c.connectClient
|
||||
}
|
||||
|
||||
// sshState returns the active config and the running connect client for the
|
||||
// in-app SSH client. Both are nil until Run has loaded the config and started
|
||||
// the tunnel.
|
||||
func (c *Client) sshState() (*profilemanager.Config, *internal.ConnectClient) {
|
||||
return c.stateSnapshot()
|
||||
}
|
||||
|
||||
func formatDuration(d time.Duration) string {
|
||||
ds := d.String()
|
||||
dotIndex := strings.Index(ds, ".")
|
||||
|
||||
@@ -1,434 +0,0 @@
|
||||
//go:build ios
|
||||
|
||||
package NetBirdSDK
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
gossh "golang.org/x/crypto/ssh"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal"
|
||||
"github.com/netbirdio/netbird/client/internal/auth"
|
||||
"github.com/netbirdio/netbird/client/internal/profilemanager"
|
||||
nbssh "github.com/netbirdio/netbird/client/ssh"
|
||||
"github.com/netbirdio/netbird/client/ssh/detection"
|
||||
)
|
||||
|
||||
const (
|
||||
sshDialTimeout = 30 * time.Second
|
||||
sshDetectionTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
// SSHTerminalListener receives SSH session events. It is implemented in Swift.
|
||||
//
|
||||
// All callbacks are invoked from goroutines and may run concurrently with each
|
||||
// other; the implementation must be safe to call from any thread.
|
||||
type SSHTerminalListener interface {
|
||||
OnConnected()
|
||||
OnData(data []byte)
|
||||
OnClose(reason string)
|
||||
OnError(message string)
|
||||
}
|
||||
|
||||
// SSHClient is a NetBird-aware SSH client exposed to Swift via gomobile.
|
||||
//
|
||||
// It dials through the running NetBird tunnel and runs a standard SSH session
|
||||
// on top with PTY enabled. Host-key verification uses the NetBird-provided
|
||||
// peer SSH host keys, identical to the desktop client.
|
||||
type SSHClient struct {
|
||||
nb *Client
|
||||
mu sync.Mutex
|
||||
listener SSHTerminalListener
|
||||
urlOpener URLOpener
|
||||
|
||||
sshClient *gossh.Client
|
||||
session *gossh.Session
|
||||
stdin io.WriteCloser
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewSSHClient creates a new SSH client bound to the running NetBird Client.
|
||||
func NewSSHClient(c *Client) *SSHClient {
|
||||
return &SSHClient{nb: c}
|
||||
}
|
||||
|
||||
// SetListener registers the Swift listener. Must be called before Connect to
|
||||
// receive any events.
|
||||
func (s *SSHClient) SetListener(l SSHTerminalListener) {
|
||||
s.mu.Lock()
|
||||
s.listener = l
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// SetURLOpener registers the Swift URL opener used to display the device-code
|
||||
// authorization page in an in-app browser when the target peer requires JWT
|
||||
// authentication. Must be set before Connect to be effective.
|
||||
func (s *SSHClient) SetURLOpener(opener URLOpener) {
|
||||
s.mu.Lock()
|
||||
s.urlOpener = opener
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
// Connect dials the SSH server through the NetBird tunnel and performs the
|
||||
// SSH handshake. It auto-detects the server type via SSH banner inspection
|
||||
// and selects the appropriate authentication path:
|
||||
//
|
||||
// - NetBird-SSH server requiring JWT: launches the OAuth 2.0 device-code
|
||||
// flow, opens the verification URL through the registered URLOpener, and
|
||||
// uses the resulting token as the SSH password. Host-key verification
|
||||
// uses the NetBird peer registry.
|
||||
// - NetBird-SSH server without JWT: authenticates with the NetBird SSH
|
||||
// private key. Host-key verification uses the NetBird peer registry.
|
||||
// - Regular SSH server (e.g. OpenSSH): authenticates with the NetBird key
|
||||
// first (so a user-installed NetBird public key works), then falls back
|
||||
// to the supplied password if non-empty. Host-key verification is
|
||||
// disabled (TOFU pending).
|
||||
//
|
||||
// The password parameter is only consulted for regular SSH servers.
|
||||
func (s *SSHClient) Connect(host string, port int, user, password string) error {
|
||||
cfg, cc := s.nb.sshState()
|
||||
if cc == nil {
|
||||
return errors.New("netbird client not running")
|
||||
}
|
||||
if cfg == nil {
|
||||
return errors.New("netbird config not loaded")
|
||||
}
|
||||
engine := cc.Engine()
|
||||
if engine == nil {
|
||||
return errors.New("netbird engine not available")
|
||||
}
|
||||
|
||||
serverType := detectServerType(host, port)
|
||||
log.Infof("SSH server type for %s:%d: %s", host, port, serverType)
|
||||
|
||||
authMethods, hostKeyCallback, err := s.buildAuth(cfg, engine, serverType, password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
clientConfig := &gossh.ClientConfig{
|
||||
User: user,
|
||||
Auth: authMethods,
|
||||
HostKeyCallback: hostKeyCallback,
|
||||
Timeout: sshDialTimeout,
|
||||
}
|
||||
return s.dialAndHandshake(host, port, clientConfig)
|
||||
}
|
||||
|
||||
// StartSession requests a PTY and starts an interactive shell. Output from
|
||||
// the session is forwarded to the listener via OnData.
|
||||
func (s *SSHClient) StartSession(cols, rows int) error {
|
||||
log.Debugf("SSH: starting session %dx%d", cols, rows)
|
||||
s.mu.Lock()
|
||||
sshClient := s.sshClient
|
||||
s.mu.Unlock()
|
||||
|
||||
if sshClient == nil {
|
||||
return errors.New("ssh client not connected")
|
||||
}
|
||||
|
||||
session, err := sshClient.NewSession()
|
||||
if err != nil {
|
||||
return fmt.Errorf("new session: %w", err)
|
||||
}
|
||||
|
||||
modes := gossh.TerminalModes{
|
||||
gossh.ECHO: 1,
|
||||
gossh.TTY_OP_ISPEED: 14400,
|
||||
gossh.TTY_OP_OSPEED: 14400,
|
||||
gossh.VINTR: 3,
|
||||
gossh.VQUIT: 28,
|
||||
gossh.VERASE: 127,
|
||||
}
|
||||
if err := session.RequestPty("xterm-256color", rows, cols, modes); err != nil {
|
||||
closeQuiet(session, "session after pty error")
|
||||
return fmt.Errorf("request pty: %w", err)
|
||||
}
|
||||
|
||||
stdin, err := session.StdinPipe()
|
||||
if err != nil {
|
||||
closeQuiet(session, "session after stdin error")
|
||||
return fmt.Errorf("stdin pipe: %w", err)
|
||||
}
|
||||
stdout, err := session.StdoutPipe()
|
||||
if err != nil {
|
||||
closeQuiet(session, "session after stdout error")
|
||||
return fmt.Errorf("stdout pipe: %w", err)
|
||||
}
|
||||
stderr, err := session.StderrPipe()
|
||||
if err != nil {
|
||||
closeQuiet(session, "session after stderr error")
|
||||
return fmt.Errorf("stderr pipe: %w", err)
|
||||
}
|
||||
|
||||
if err := session.Shell(); err != nil {
|
||||
closeQuiet(session, "session after shell error")
|
||||
return fmt.Errorf("start shell: %w", err)
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.session = session
|
||||
s.stdin = stdin
|
||||
s.mu.Unlock()
|
||||
|
||||
go s.readLoop(stdout, "stdout")
|
||||
go s.readLoop(stderr, "stderr")
|
||||
log.Debug("SSH: session started, shell running")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write sends data to the SSH session stdin.
|
||||
func (s *SSHClient) Write(data []byte) error {
|
||||
s.mu.Lock()
|
||||
stdin := s.stdin
|
||||
s.mu.Unlock()
|
||||
if stdin == nil {
|
||||
return errors.New("ssh session not started")
|
||||
}
|
||||
if _, err := stdin.Write(data); err != nil {
|
||||
return fmt.Errorf("write stdin: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Resize updates the PTY window size.
|
||||
func (s *SSHClient) Resize(cols, rows int) error {
|
||||
s.mu.Lock()
|
||||
session := s.session
|
||||
s.mu.Unlock()
|
||||
if session == nil {
|
||||
return errors.New("ssh session not started")
|
||||
}
|
||||
return session.WindowChange(rows, cols)
|
||||
}
|
||||
|
||||
// Close terminates the SSH session and underlying connection. Safe to call
|
||||
// multiple times.
|
||||
func (s *SSHClient) Close() error {
|
||||
s.mu.Lock()
|
||||
sshClient := s.sshClient
|
||||
session := s.session
|
||||
stdin := s.stdin
|
||||
s.sshClient = nil
|
||||
s.session = nil
|
||||
s.stdin = nil
|
||||
s.mu.Unlock()
|
||||
|
||||
if stdin != nil {
|
||||
if err := stdin.Close(); err != nil {
|
||||
log.Debugf("ssh: stdin close: %v", err)
|
||||
}
|
||||
}
|
||||
if session != nil {
|
||||
if err := session.Close(); err != nil && !errors.Is(err, io.EOF) {
|
||||
log.Debugf("ssh: session close: %v", err)
|
||||
}
|
||||
}
|
||||
var firstErr error
|
||||
if sshClient != nil {
|
||||
if err := sshClient.Close(); err != nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
s.notifyClose("closed by client")
|
||||
return firstErr
|
||||
}
|
||||
|
||||
func (s *SSHClient) buildAuth(cfg *profilemanager.Config, engine *internal.Engine,
|
||||
serverType detection.ServerType, password string) ([]gossh.AuthMethod, gossh.HostKeyCallback, error) {
|
||||
|
||||
switch serverType {
|
||||
case detection.ServerTypeNetBirdJWT:
|
||||
token, err := s.requestJWTToken(cfg)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("jwt: %w", err)
|
||||
}
|
||||
auths := []gossh.AuthMethod{gossh.Password(token)}
|
||||
return auths, nbssh.CreateHostKeyCallback(&engineHostKeyVerifier{engine: engine}), nil
|
||||
|
||||
case detection.ServerTypeNetBirdNoJWT:
|
||||
if cfg.SSHKey == "" {
|
||||
return nil, nil, errors.New("no NetBird SSH key available")
|
||||
}
|
||||
signer, err := gossh.ParsePrivateKey([]byte(cfg.SSHKey))
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("parse netbird ssh key: %w", err)
|
||||
}
|
||||
auths := []gossh.AuthMethod{gossh.PublicKeys(signer)}
|
||||
return auths, nbssh.CreateHostKeyCallback(&engineHostKeyVerifier{engine: engine}), nil
|
||||
|
||||
default: // regular SSH
|
||||
var auths []gossh.AuthMethod
|
||||
if cfg.SSHKey != "" {
|
||||
if signer, err := gossh.ParsePrivateKey([]byte(cfg.SSHKey)); err == nil {
|
||||
auths = append(auths, gossh.PublicKeys(signer))
|
||||
} else {
|
||||
log.Debugf("ssh: parse netbird key for regular auth: %v", err)
|
||||
}
|
||||
}
|
||||
if password != "" {
|
||||
pw := password
|
||||
auths = append(auths, gossh.Password(pw))
|
||||
auths = append(auths, gossh.KeyboardInteractive(func(_, _ string, questions []string, _ []bool) ([]string, error) {
|
||||
answers := make([]string, len(questions))
|
||||
for i := range questions {
|
||||
answers[i] = pw
|
||||
}
|
||||
return answers, nil
|
||||
}))
|
||||
}
|
||||
if len(auths) == 0 {
|
||||
return nil, nil, errors.New("no auth method available: provide a password or configure NetBird SSH key")
|
||||
}
|
||||
return auths, gossh.InsecureIgnoreHostKey(), nil // nolint:gosec // TOFU not yet implemented
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SSHClient) requestJWTToken(cfg *profilemanager.Config) (string, error) {
|
||||
s.mu.Lock()
|
||||
urlOpener := s.urlOpener
|
||||
s.mu.Unlock()
|
||||
if urlOpener == nil {
|
||||
return "", errors.New("URL opener not configured for JWT auth")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
flow, err := auth.NewOAuthFlow(ctx, cfg, false, true, profilemanager.GetLoginHint())
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("create oauth flow: %w", err)
|
||||
}
|
||||
|
||||
flowInfo, err := flow.RequestAuthInfo(ctx)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("request auth info: %w", err)
|
||||
}
|
||||
|
||||
go urlOpener.Open(flowInfo.VerificationURIComplete, flowInfo.UserCode)
|
||||
|
||||
tokenInfo, err := flow.WaitToken(ctx, flowInfo)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("wait for token: %w", err)
|
||||
}
|
||||
|
||||
token := tokenInfo.GetTokenToUse()
|
||||
if token == "" {
|
||||
return "", errors.New("empty token returned by IdP")
|
||||
}
|
||||
return token, nil
|
||||
}
|
||||
|
||||
func (s *SSHClient) dialAndHandshake(host string, port int, clientConfig *gossh.ClientConfig) error {
|
||||
addr := net.JoinHostPort(host, strconv.Itoa(port))
|
||||
log.Infof("SSH: connecting to %s as %s", addr, clientConfig.User)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), sshDialTimeout)
|
||||
defer cancel()
|
||||
|
||||
var dialer net.Dialer
|
||||
conn, err := dialer.DialContext(ctx, "tcp", addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("dial %s: %w", addr, err)
|
||||
}
|
||||
|
||||
sshConn, chans, reqs, err := gossh.NewClientConn(conn, addr, clientConfig)
|
||||
if err != nil {
|
||||
if cerr := conn.Close(); cerr != nil {
|
||||
log.Debugf("ssh: close after handshake error: %v", cerr)
|
||||
}
|
||||
return fmt.Errorf("ssh handshake: %w", err)
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.sshClient = gossh.NewClient(sshConn, chans, reqs)
|
||||
listener := s.listener
|
||||
s.mu.Unlock()
|
||||
|
||||
log.Infof("SSH: connected to %s", addr)
|
||||
if listener != nil {
|
||||
listener.OnConnected()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SSHClient) readLoop(r io.Reader, name string) {
|
||||
buf := make([]byte, 4096)
|
||||
for {
|
||||
n, err := r.Read(buf)
|
||||
if n > 0 {
|
||||
s.mu.Lock()
|
||||
listener := s.listener
|
||||
s.mu.Unlock()
|
||||
if listener != nil {
|
||||
chunk := make([]byte, n)
|
||||
copy(chunk, buf[:n])
|
||||
listener.OnData(chunk)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if !errors.Is(err, io.EOF) {
|
||||
log.Debugf("ssh %s read: %v", name, err)
|
||||
}
|
||||
s.notifyClose(err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SSHClient) notifyClose(reason string) {
|
||||
s.mu.Lock()
|
||||
if s.closed {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
s.closed = true
|
||||
listener := s.listener
|
||||
s.mu.Unlock()
|
||||
if listener != nil {
|
||||
listener.OnClose(reason)
|
||||
}
|
||||
}
|
||||
|
||||
// engineHostKeyVerifier adapts *internal.Engine to nbssh.HostKeyVerifier.
|
||||
type engineHostKeyVerifier struct {
|
||||
engine *internal.Engine
|
||||
}
|
||||
|
||||
func (v *engineHostKeyVerifier) VerifySSHHostKey(peerAddress string, presented []byte) error {
|
||||
storedKey, found := v.engine.GetPeerSSHKey(peerAddress)
|
||||
if !found {
|
||||
return nbssh.ErrPeerNotFound
|
||||
}
|
||||
return nbssh.VerifyHostKey(storedKey, presented, peerAddress)
|
||||
}
|
||||
|
||||
func detectServerType(host string, port int) detection.ServerType {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), sshDetectionTimeout)
|
||||
defer cancel()
|
||||
|
||||
dialer := &net.Dialer{}
|
||||
serverType, err := detection.DetectSSHServerType(ctx, dialer, host, port)
|
||||
if err != nil {
|
||||
log.Debugf("ssh: server detection for %s:%d failed: %v (assuming regular SSH)", host, port, err)
|
||||
return detection.ServerTypeRegular
|
||||
}
|
||||
return serverType
|
||||
}
|
||||
|
||||
func closeQuiet(c io.Closer, label string) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
if err := c.Close(); err != nil && !errors.Is(err, io.EOF) {
|
||||
log.Debugf("ssh: close %s: %v", label, err)
|
||||
}
|
||||
}
|
||||
@@ -136,6 +136,7 @@ func New(ctx context.Context, logFile string, configFile string, profilesDisable
|
||||
networksDisabled: networksDisabled,
|
||||
jwtCache: newJWTCache(),
|
||||
}
|
||||
go s.statusRecorder.StartProfile(ctx)
|
||||
agent := &serverAgent{s}
|
||||
s.sleepHandler = sleephandler.New(agent)
|
||||
s.startSleepDetector()
|
||||
|
||||
@@ -1916,6 +1916,117 @@ func TestDefaultAccountManager_MarkPeerConnected_PeerLoginExpiration(t *testing.
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultAccountManager_MarkPeerDisconnected_SchedulesInactivityExpiration(t *testing.T) {
|
||||
manager, _, err := createManager(t)
|
||||
require.NoError(t, err, "unable to create account manager")
|
||||
|
||||
accountID, err := manager.GetAccountIDByUserID(context.Background(), auth.UserAuth{UserId: userID})
|
||||
require.NoError(t, err, "unable to create an account")
|
||||
|
||||
key, err := wgtypes.GenerateKey()
|
||||
require.NoError(t, err, "unable to generate WireGuard key")
|
||||
peerPubKey := key.PublicKey().String()
|
||||
|
||||
_, _, _, _, err = manager.AddPeer(context.Background(), "", "", userID, &nbpeer.Peer{
|
||||
Key: peerPubKey,
|
||||
Meta: nbpeer.PeerSystemMeta{Hostname: "test-peer"},
|
||||
InactivityExpirationEnabled: true,
|
||||
}, false)
|
||||
require.NoError(t, err, "unable to add peer")
|
||||
|
||||
_, err = manager.UpdateAccountSettings(context.Background(), accountID, userID, &types.Settings{
|
||||
PeerLoginExpiration: time.Hour,
|
||||
PeerLoginExpirationEnabled: true,
|
||||
PeerInactivityExpiration: time.Hour,
|
||||
PeerInactivityExpirationEnabled: true,
|
||||
Extra: &types.ExtraSettings{},
|
||||
})
|
||||
require.NoError(t, err, "expecting to update account settings successfully but got error")
|
||||
|
||||
// Establish a session so the matching-token disconnect is actually applied.
|
||||
streamStartTime := time.Now().UTC()
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, accountID, streamStartTime.UnixNano(), nil)
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
// Install the mock only now, so the assertion observes the disconnect, not
|
||||
// the earlier connect.
|
||||
scheduled := make(chan struct{}, 1)
|
||||
manager.peerInactivityExpiry = &MockScheduler{
|
||||
CancelFunc: func(ctx context.Context, IDs []string) {},
|
||||
ScheduleFunc: func(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) {
|
||||
select {
|
||||
case scheduled <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
err = manager.MarkPeerDisconnected(context.Background(), peerPubKey, accountID, streamStartTime.UnixNano())
|
||||
require.NoError(t, err, "unable to mark peer disconnected")
|
||||
|
||||
select {
|
||||
case <-scheduled:
|
||||
// expected: disconnect re-armed the inactivity expiry timer
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("expected inactivity expiration to be rescheduled when an eligible peer disconnects")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultAccountManager_MarkPeerDisconnected_SkipsInactivityExpirationWhenDisabled(t *testing.T) {
|
||||
manager, _, err := createManager(t)
|
||||
require.NoError(t, err, "unable to create account manager")
|
||||
|
||||
accountID, err := manager.GetAccountIDByUserID(context.Background(), auth.UserAuth{UserId: userID})
|
||||
require.NoError(t, err, "unable to create an account")
|
||||
|
||||
key, err := wgtypes.GenerateKey()
|
||||
require.NoError(t, err, "unable to generate WireGuard key")
|
||||
peerPubKey := key.PublicKey().String()
|
||||
|
||||
_, _, _, _, err = manager.AddPeer(context.Background(), "", "", userID, &nbpeer.Peer{
|
||||
Key: peerPubKey,
|
||||
Meta: nbpeer.PeerSystemMeta{Hostname: "test-peer"},
|
||||
InactivityExpirationEnabled: true,
|
||||
}, false)
|
||||
require.NoError(t, err, "unable to add peer")
|
||||
|
||||
// Peer is eligible (SSO + inactivity enabled) but the account-level setting
|
||||
// stays disabled, so disconnect must not schedule anything.
|
||||
_, err = manager.UpdateAccountSettings(context.Background(), accountID, userID, &types.Settings{
|
||||
PeerLoginExpiration: time.Hour,
|
||||
PeerLoginExpirationEnabled: true,
|
||||
PeerInactivityExpiration: time.Hour,
|
||||
PeerInactivityExpirationEnabled: false,
|
||||
Extra: &types.ExtraSettings{},
|
||||
})
|
||||
require.NoError(t, err, "expecting to update account settings successfully but got error")
|
||||
|
||||
streamStartTime := time.Now().UTC()
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, accountID, streamStartTime.UnixNano(), nil)
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
scheduled := make(chan struct{}, 1)
|
||||
manager.peerInactivityExpiry = &MockScheduler{
|
||||
CancelFunc: func(ctx context.Context, IDs []string) {},
|
||||
ScheduleFunc: func(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) {
|
||||
select {
|
||||
case scheduled <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
err = manager.MarkPeerDisconnected(context.Background(), peerPubKey, accountID, streamStartTime.UnixNano())
|
||||
require.NoError(t, err, "unable to mark peer disconnected")
|
||||
|
||||
select {
|
||||
case <-scheduled:
|
||||
t.Fatal("inactivity expiration must not be scheduled while the account-level setting is disabled")
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
// expected: nothing scheduled
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultAccountManager_OnPeerDisconnected_LastSeenCheck(t *testing.T) {
|
||||
manager, _, err := createManager(t)
|
||||
require.NoError(t, err, "unable to create account manager")
|
||||
|
||||
@@ -188,6 +188,15 @@ func (am *DefaultAccountManager) MarkPeerDisconnected(ctx context.Context, peerP
|
||||
}
|
||||
}
|
||||
|
||||
if peer.AddedWithSSOLogin() && peer.InactivityExpirationEnabled {
|
||||
settings, err := am.Store.GetAccountSettings(ctx, store.LockingStrengthNone, accountID)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed getting account settings to schedule inactivity expiration for peer %s: %v", peer.ID, err)
|
||||
} else if settings.PeerInactivityExpirationEnabled {
|
||||
am.checkAndSchedulePeerInactivityExpiration(ctx, accountID)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -243,7 +243,11 @@ func NewClientWithServerIP(serverURL string, serverIP netip.Addr, authTokenStore
|
||||
|
||||
// Connect establishes a connection to the relay server. It blocks until the connection is established or an error occurs.
|
||||
func (c *Client) Connect(ctx context.Context) error {
|
||||
c.log.Infof("connecting to relay server")
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
c.log.Infof("connect elapsed time: %v", time.Since(start))
|
||||
}()
|
||||
|
||||
c.readLoopMutex.Lock()
|
||||
defer c.readLoopMutex.Unlock()
|
||||
|
||||
@@ -287,6 +291,11 @@ func (c *Client) Connect(ctx context.Context) error {
|
||||
func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, error) {
|
||||
peerID := messages.HashID(dstPeerID)
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
c.log.Infof("connect elapsed time: %v", time.Since(start))
|
||||
}()
|
||||
|
||||
c.mu.Lock()
|
||||
if !c.serviceIsRunning {
|
||||
c.mu.Unlock()
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"go.opentelemetry.io/otel"
|
||||
|
||||
"github.com/netbirdio/netbird/client/iface"
|
||||
@@ -252,7 +253,7 @@ func TestClient_ConnectedIPParsesRemoteAddr(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := &Client{relayConn: stubConn{remote: staticAddr{s: tt.s}}}
|
||||
c := &Client{log: log.WithField("relay", tt.name), relayConn: stubConn{remote: staticAddr{s: tt.s}}}
|
||||
got := c.ConnectedIP()
|
||||
var gotStr string
|
||||
if got.IsValid() {
|
||||
|
||||
@@ -78,6 +78,23 @@ type GrpcClient struct {
|
||||
// transport-alive but no longer delivering messages. It is the source of
|
||||
// truth IsHealthy reads, and is cleared once any frame is received again.
|
||||
receiveStalled atomic.Bool
|
||||
// receiveHandoffBlocked is set while the receive loop is parked handing a
|
||||
// message to a busy decryption worker. The loop stops calling Recv (and
|
||||
// markReceived) in that window, so the stream looks silent though it is
|
||||
// healthy. The watchdog reads this to avoid misreading self-inflicted
|
||||
// receive backpressure as a dead stream: reconnecting cannot help, since the
|
||||
// new stream feeds the same worker, and only triggers a reconnect storm.
|
||||
receiveHandoffBlocked atomic.Bool
|
||||
// lastDecrypt holds the Unix-nano timestamp of the last message the decryption
|
||||
// worker pulled off its queue. Diagnostic only: it lets a stall log show
|
||||
// whether the worker was draining (busy) or idle when the stream went silent.
|
||||
lastDecrypt atomic.Int64
|
||||
// handoffWaitTotal, handoffWaitMax (nanos) and handoffWaitCount accumulate the
|
||||
// time the receive loop spent blocked handing messages to the worker. This is
|
||||
// time not spent reading the stream, so it quantifies receive backpressure.
|
||||
handoffWaitTotal atomic.Int64
|
||||
handoffWaitMax atomic.Int64
|
||||
handoffWaitCount atomic.Int64
|
||||
}
|
||||
|
||||
// NewClient creates a new Signal client
|
||||
@@ -353,6 +370,8 @@ func (c *GrpcClient) SendToStream(msg *proto.EncryptedMessage) error {
|
||||
|
||||
// decryptMessage decrypts the body of the msg using Wireguard private key and Remote peer's public key
|
||||
func (c *GrpcClient) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, error) {
|
||||
c.lastDecrypt.Store(time.Now().UnixNano())
|
||||
|
||||
remoteKey, err := wgtypes.ParseKey(msg.GetKey())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -439,6 +458,22 @@ func (c *GrpcClient) idleSinceReceive() time.Duration {
|
||||
return time.Since(time.Unix(0, c.lastReceived.Load()))
|
||||
}
|
||||
|
||||
// idleSinceDecrypt returns how long since the worker last pulled a message.
|
||||
// Diagnostic only: distinguishes a busy/wedged worker from an idle one.
|
||||
func (c *GrpcClient) idleSinceDecrypt() time.Duration {
|
||||
return time.Since(time.Unix(0, c.lastDecrypt.Load()))
|
||||
}
|
||||
|
||||
// receiveAlive reports whether the receive stream shows liveness: it delivered a
|
||||
// frame within the inactivity threshold, or the receive loop is currently parked
|
||||
// handing a message to a busy decryption worker. In the latter case the loop has
|
||||
// stopped calling Recv, so the stream looks silent while being healthy, and
|
||||
// reconnecting would not help, so the watchdog must treat it as alive.
|
||||
func (c *GrpcClient) receiveAlive() bool {
|
||||
return c.idleSinceReceive() < receiveInactivityThreshold ||
|
||||
c.receiveHandoffBlocked.Load()
|
||||
}
|
||||
|
||||
// watchReceiveStream guards against a receive stream that is transport-alive but
|
||||
// no longer delivering messages. While the stream is idle past
|
||||
// receiveInactivityThreshold it sends a self-addressed probe that the Signal
|
||||
@@ -450,18 +485,55 @@ func (c *GrpcClient) watchReceiveStream(ctx context.Context, cancelStream contex
|
||||
defer ticker.Stop()
|
||||
|
||||
var probeSentAt time.Time
|
||||
var holdLogged bool
|
||||
var statTicks int
|
||||
var lastStatTotal int64
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if c.idleSinceReceive() < receiveInactivityThreshold {
|
||||
// Periodic backpressure summary so time lost to the worker handoff is
|
||||
// visible even when no stall fires. Emitted ~once a minute and only
|
||||
// when the wait grew, to stay quiet on a healthy stream.
|
||||
if statTicks++; statTicks >= int(time.Minute/receiveWatchdogInterval) {
|
||||
statTicks = 0
|
||||
if total, max, count := c.handoffWaitStats(); int64(total) > lastStatTotal {
|
||||
log.Infof("signal receive backpressure: handoffWaitTotal=%s (+%s last min) handoffWaitMax=%s handoffMsgs=%d",
|
||||
total.Round(time.Second), (total - time.Duration(lastStatTotal)).Round(time.Millisecond),
|
||||
max.Round(time.Millisecond), count)
|
||||
lastStatTotal = int64(total)
|
||||
}
|
||||
}
|
||||
|
||||
if c.receiveAlive() {
|
||||
// Attribute the case that matters in the field: silent past the
|
||||
// threshold but held because the receive loop is parked on the
|
||||
// worker handoff (backpressure), not a dead stream. Log once per
|
||||
// hold episode so a persistent worker stall is visible at info.
|
||||
if c.idleSinceReceive() >= receiveInactivityThreshold && c.receiveHandoffBlocked.Load() {
|
||||
if !holdLogged {
|
||||
total, max, count := c.handoffWaitStats()
|
||||
log.Infof("signal receive idle %s, loop blocked on worker handoff (idleDecrypt=%s queueDepth=%d connState=%s handoffWaitTotal=%s handoffWaitMax=%s handoffMsgs=%d); holding stream",
|
||||
c.idleSinceReceive().Round(time.Second), c.idleSinceDecrypt().Round(time.Second),
|
||||
c.decryptionWorker.QueueLen(), c.signalConn.GetState(),
|
||||
total.Round(time.Second), max.Round(time.Millisecond), count)
|
||||
holdLogged = true
|
||||
}
|
||||
} else {
|
||||
holdLogged = false
|
||||
}
|
||||
probeSentAt = time.Time{}
|
||||
continue
|
||||
}
|
||||
holdLogged = false
|
||||
|
||||
if !probeSentAt.IsZero() && time.Since(probeSentAt) >= receiveProbeTimeout {
|
||||
log.Warnf("signal receive stream stalled: no messages for %s and probe did not return, reconnecting", c.idleSinceReceive().Round(time.Second))
|
||||
total, max, count := c.handoffWaitStats()
|
||||
log.Warnf("signal receive stream stalled, reconnecting: idleRecv=%s idleDecrypt=%s handoffBlocked=%v queueDepth=%d connState=%s handoffWaitTotal=%s handoffWaitMax=%s handoffMsgs=%d probe did not return",
|
||||
c.idleSinceReceive().Round(time.Second), c.idleSinceDecrypt().Round(time.Second),
|
||||
c.receiveHandoffBlocked.Load(), c.decryptionWorker.QueueLen(), c.signalConn.GetState(),
|
||||
total.Round(time.Second), max.Round(time.Millisecond), count)
|
||||
c.receiveStalled.Store(true)
|
||||
c.notifyDisconnected(errReceiveStreamStalled)
|
||||
cancelStream()
|
||||
@@ -517,12 +589,37 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) er
|
||||
continue
|
||||
}
|
||||
|
||||
// The handoff blocks while the worker is busy, which parks this loop and
|
||||
// stops Recv. Flag it so the watchdog does not read the resulting silence
|
||||
// as a dead stream, and account the wait as receive backpressure.
|
||||
handoffStart := time.Now()
|
||||
c.receiveHandoffBlocked.Store(true)
|
||||
if err := c.decryptionWorker.AddMsg(c.ctx, msg); err != nil {
|
||||
log.Errorf("failed to add message to decryption worker: %v", err)
|
||||
}
|
||||
c.receiveHandoffBlocked.Store(false)
|
||||
c.recordHandoffWait(time.Since(handoffStart))
|
||||
}
|
||||
}
|
||||
|
||||
// recordHandoffWait accumulates the time the receive loop was blocked handing a
|
||||
// message to the worker.
|
||||
func (c *GrpcClient) recordHandoffWait(d time.Duration) {
|
||||
c.handoffWaitTotal.Add(int64(d))
|
||||
c.handoffWaitCount.Add(1)
|
||||
for {
|
||||
cur := c.handoffWaitMax.Load()
|
||||
if int64(d) <= cur || c.handoffWaitMax.CompareAndSwap(cur, int64(d)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handoffWaitStats returns cumulative receive-loop handoff backpressure.
|
||||
func (c *GrpcClient) handoffWaitStats() (total, max time.Duration, count int64) {
|
||||
return time.Duration(c.handoffWaitTotal.Load()), time.Duration(c.handoffWaitMax.Load()), c.handoffWaitCount.Load()
|
||||
}
|
||||
|
||||
func (c *GrpcClient) startEncryptionWorker(handler func(msg *proto.Message) error) {
|
||||
if c.decryptionWorker != nil {
|
||||
return
|
||||
|
||||
@@ -82,3 +82,27 @@ func TestReceiveProbeRoundTrips(t *testing.T) {
|
||||
t.Fatal("self-addressed heartbeat did not round-trip back through the signal server")
|
||||
}
|
||||
}
|
||||
|
||||
// TestReceiveAliveTreatsHandoffBlockAsLiveness reproduces the false positive
|
||||
// where a busy decryption worker parks the receive loop on the worker handoff,
|
||||
// so Recv (and markReceived) stops firing even though the stream is healthy.
|
||||
// With the receive stream silent past the inactivity threshold but the loop
|
||||
// blocked on handoff, the watchdog must consider the stream alive rather than
|
||||
// tear it down (reconnecting feeds the same worker and would not help).
|
||||
func TestReceiveAliveTreatsHandoffBlockAsLiveness(t *testing.T) {
|
||||
c := &GrpcClient{}
|
||||
|
||||
// Receive stream silent and the loop not blocked on handoff: genuinely stalled.
|
||||
c.lastReceived.Store(time.Now().Add(-2 * receiveInactivityThreshold).UnixNano())
|
||||
require.False(t, c.receiveAlive(), "silent stream with the receive loop idle must be treated as stalled")
|
||||
|
||||
// Receive stream silent but the loop is parked handing a message to a busy
|
||||
// worker: self-inflicted backpressure, not a dead stream, must not tear down.
|
||||
c.receiveHandoffBlocked.Store(true)
|
||||
require.True(t, c.receiveAlive(), "a receive loop blocked on worker handoff must keep the stream alive")
|
||||
|
||||
// Handoff drained, loop back to reading, a frame just arrived: alive via the receive path.
|
||||
c.receiveHandoffBlocked.Store(false)
|
||||
c.markReceived()
|
||||
require.True(t, c.receiveAlive(), "a freshly received frame must keep the stream alive")
|
||||
}
|
||||
|
||||
@@ -32,6 +32,13 @@ func (w *Worker) AddMsg(ctx context.Context, msg *proto.EncryptedMessage) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// QueueLen returns the number of messages buffered for decryption. Diagnostic
|
||||
// only: a non-empty queue while the receive stream is silent indicates the
|
||||
// receive loop is parked on the handoff rather than the stream being dead.
|
||||
func (w *Worker) QueueLen() int {
|
||||
return len(w.encryptedMsgPool)
|
||||
}
|
||||
|
||||
func (w *Worker) Work(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
|
||||
Reference in New Issue
Block a user