mirror of
https://github.com/netbirdio/netbird.git
synced 2026-06-27 02:10:00 +00:00
Compare commits
3 Commits
fix/revert
...
feat/admin
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
520370a8b0 | ||
|
|
b5a16a1898 | ||
|
|
449b5cbb80 |
@@ -130,7 +130,7 @@ func debugConfigDump(cmd *cobra.Command, _ []string) error {
|
||||
|
||||
client := proto.NewDaemonServiceClient(conn)
|
||||
resp, err := client.GetConfig(cmd.Context(), &proto.GetConfigRequest{
|
||||
ProfileName: string(activeProf.ID),
|
||||
ProfileName: activeProf.Name,
|
||||
Username: currUser.Username,
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
@@ -41,7 +41,6 @@ type ICEBind struct {
|
||||
*wgConn.StdNetBind
|
||||
|
||||
transportNet transport.Net
|
||||
filterFn udpmux.FilterFn
|
||||
address wgaddr.Address
|
||||
mtu uint16
|
||||
|
||||
@@ -61,12 +60,11 @@ type ICEBind struct {
|
||||
ipv6Conn *net.UDPConn
|
||||
}
|
||||
|
||||
func NewICEBind(transportNet transport.Net, filterFn udpmux.FilterFn, address wgaddr.Address, mtu uint16) *ICEBind {
|
||||
func NewICEBind(transportNet transport.Net, address wgaddr.Address, mtu uint16) *ICEBind {
|
||||
b, _ := wgConn.NewStdNetBind().(*wgConn.StdNetBind)
|
||||
ib := &ICEBind{
|
||||
StdNetBind: b,
|
||||
transportNet: transportNet,
|
||||
filterFn: filterFn,
|
||||
address: address,
|
||||
mtu: mtu,
|
||||
endpoints: make(map[netip.Addr]net.Conn),
|
||||
@@ -265,7 +263,6 @@ func (s *ICEBind) createOrUpdateMux() {
|
||||
udpmux.UniversalUDPMuxParams{
|
||||
UDPConn: muxConn,
|
||||
Net: s.transportNet,
|
||||
FilterFn: s.filterFn,
|
||||
WGAddress: s.address,
|
||||
MTU: s.mtu,
|
||||
},
|
||||
|
||||
@@ -289,7 +289,7 @@ func setupICEBind(t *testing.T) *ICEBind {
|
||||
IP: netip.MustParseAddr("100.64.0.1"),
|
||||
Network: netip.MustParsePrefix("100.64.0.0/10"),
|
||||
}
|
||||
return NewICEBind(transportNet, nil, address, 1280)
|
||||
return NewICEBind(transportNet, address, 1280)
|
||||
}
|
||||
|
||||
func createDualStackConns(t *testing.T) (*net.UDPConn, *net.UDPConn) {
|
||||
|
||||
@@ -32,8 +32,6 @@ type TunKernelDevice struct {
|
||||
link *wgLink
|
||||
udpMuxConn net.PacketConn
|
||||
udpMux *udpmux.UniversalUDPMuxDefault
|
||||
|
||||
filterFn udpmux.FilterFn
|
||||
}
|
||||
|
||||
func NewKernelDevice(name string, address wgaddr.Address, wgPort int, key string, mtu uint16, transportNet transport.Net) *TunKernelDevice {
|
||||
@@ -104,7 +102,6 @@ func (t *TunKernelDevice) Up() (*udpmux.UniversalUDPMuxDefault, error) {
|
||||
bindParams := udpmux.UniversalUDPMuxParams{
|
||||
UDPConn: nbnet.WrapPacketConn(rawSock),
|
||||
Net: t.transportNet,
|
||||
FilterFn: t.filterFn,
|
||||
WGAddress: t.address,
|
||||
MTU: t.mtu,
|
||||
}
|
||||
|
||||
@@ -63,7 +63,6 @@ type WGIFaceOpts struct {
|
||||
MTU uint16
|
||||
MobileArgs *device.MobileIFaceArguments
|
||||
TransportNet transport.Net
|
||||
FilterFn udpmux.FilterFn
|
||||
DisableDNS bool
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
|
||||
// NewWGIFace Creates a new WireGuard interface instance
|
||||
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
|
||||
|
||||
var tun WGTunDevice
|
||||
if netstack.IsEnabled() {
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
|
||||
// NewWGIFace Creates a new WireGuard interface instance
|
||||
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
|
||||
|
||||
if netstack.IsEnabled() {
|
||||
wgIFace := &WGIface{
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
|
||||
// NewWGIFace Creates a new WireGuard interface instance
|
||||
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
|
||||
|
||||
wgIFace := &WGIface{
|
||||
tun: device.NewTunDevice(opts.IFaceName, opts.Address, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind, opts.MobileArgs.TunFd),
|
||||
|
||||
@@ -14,7 +14,7 @@ import (
|
||||
// NewWGIFace Creates a new WireGuard interface instance
|
||||
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
|
||||
if netstack.IsEnabled() {
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
|
||||
return &WGIface{
|
||||
tun: device.NewNetstackDevice(opts.IFaceName, opts.Address, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind, netstack.ListenAddr()),
|
||||
userspaceBind: true,
|
||||
@@ -30,7 +30,7 @@ func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
|
||||
}
|
||||
|
||||
if device.ModuleTunIsLoaded() {
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, opts.Address, opts.MTU)
|
||||
iceBind := bind.NewICEBind(opts.TransportNet, opts.Address, opts.MTU)
|
||||
return &WGIface{
|
||||
tun: device.NewTunDevice(opts.IFaceName, opts.Address, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind),
|
||||
userspaceBind: true,
|
||||
|
||||
@@ -8,8 +8,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -22,10 +20,6 @@ import (
|
||||
"github.com/netbirdio/netbird/client/iface/wgaddr"
|
||||
)
|
||||
|
||||
// FilterFn is a function that filters out candidates based on the address.
|
||||
// If it returns true, the address is to be filtered. It also returns the prefix of matching route.
|
||||
type FilterFn func(address netip.Addr) (bool, netip.Prefix, error)
|
||||
|
||||
// UniversalUDPMuxDefault handles STUN and TURN servers packets by wrapping the original UDPConn
|
||||
// It then passes packets to the UDPMux that does the actual connection muxing.
|
||||
type UniversalUDPMuxDefault struct {
|
||||
@@ -43,7 +37,6 @@ type UniversalUDPMuxParams struct {
|
||||
UDPConn net.PacketConn
|
||||
XORMappedAddrCacheTTL time.Duration
|
||||
Net transport.Net
|
||||
FilterFn FilterFn
|
||||
WGAddress wgaddr.Address
|
||||
MTU uint16
|
||||
}
|
||||
@@ -68,7 +61,6 @@ func NewUniversalUDPMuxDefault(params UniversalUDPMuxParams) *UniversalUDPMuxDef
|
||||
PacketConn: params.UDPConn,
|
||||
mux: m,
|
||||
logger: params.Logger,
|
||||
filterFn: params.FilterFn,
|
||||
address: params.WGAddress,
|
||||
}
|
||||
|
||||
@@ -115,15 +107,12 @@ func (m *UniversalUDPMuxDefault) ReadFromConn(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// UDPConn is a wrapper around UDPMux conn that overrides ReadFrom and handles STUN/TURN packets
|
||||
// UDPConn is a wrapper around UDPMux conn that overrides WriteTo to drop packets destined for the overlay subnet.
|
||||
type UDPConn struct {
|
||||
net.PacketConn
|
||||
mux *UniversalUDPMuxDefault
|
||||
logger logging.LeveledLogger
|
||||
filterFn FilterFn
|
||||
// TODO: reset cache on route changes
|
||||
addrCache sync.Map
|
||||
address wgaddr.Address
|
||||
mux *UniversalUDPMuxDefault
|
||||
logger logging.LeveledLogger
|
||||
address wgaddr.Address
|
||||
}
|
||||
|
||||
// GetPacketConn returns the underlying PacketConn
|
||||
@@ -132,65 +121,16 @@ func (u *UDPConn) GetPacketConn() net.PacketConn {
|
||||
}
|
||||
|
||||
func (u *UDPConn) WriteTo(b []byte, addr net.Addr) (int, error) {
|
||||
if u.filterFn == nil {
|
||||
udpAddr, ok := addr.(*net.UDPAddr)
|
||||
if !ok {
|
||||
return u.PacketConn.WriteTo(b, addr)
|
||||
}
|
||||
|
||||
if isRouted, found := u.addrCache.Load(addr.String()); found {
|
||||
return u.handleCachedAddress(isRouted.(bool), b, addr)
|
||||
}
|
||||
|
||||
return u.handleUncachedAddress(b, addr)
|
||||
}
|
||||
|
||||
func (u *UDPConn) handleCachedAddress(isRouted bool, b []byte, addr net.Addr) (int, error) {
|
||||
if isRouted {
|
||||
return 0, fmt.Errorf("address %s is part of a routed network, refusing to write", addr)
|
||||
}
|
||||
return u.PacketConn.WriteTo(b, addr)
|
||||
}
|
||||
|
||||
func (u *UDPConn) handleUncachedAddress(b []byte, addr net.Addr) (int, error) {
|
||||
if err := u.performFilterCheck(addr); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return u.PacketConn.WriteTo(b, addr)
|
||||
}
|
||||
|
||||
func (u *UDPConn) performFilterCheck(addr net.Addr) error {
|
||||
host, err := getHostFromAddr(addr)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to get host from address %s: %v", addr, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
a, err := netip.ParseAddr(host)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to parse address %s: %v", addr, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if u.address.Network.Contains(a) {
|
||||
dst := udpAddr.AddrPort().Addr().Unmap()
|
||||
if (u.address.Network.IsValid() && u.address.Network.Contains(dst)) || (u.address.IPv6Net.IsValid() && u.address.IPv6Net.Contains(dst)) {
|
||||
log.Warnf("address %s is part of the NetBird network %s, refusing to write", addr, u.address)
|
||||
return fmt.Errorf("address %s is part of the NetBird network %s, refusing to write", addr, u.address)
|
||||
return 0, fmt.Errorf("address %s is part of the NetBird network %s, refusing to write", addr, u.address)
|
||||
}
|
||||
|
||||
if isRouted, prefix, err := u.filterFn(a); err != nil {
|
||||
log.Errorf("Failed to check if address %s is routed: %v", addr, err)
|
||||
} else {
|
||||
u.addrCache.Store(addr.String(), isRouted)
|
||||
if isRouted {
|
||||
// Extra log, as the error only shows up with ICE logging enabled
|
||||
log.Infof("address %s is part of routed network %s, refusing to write", addr, prefix)
|
||||
return fmt.Errorf("address %s is part of routed network %s, refusing to write", addr, prefix)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getHostFromAddr(addr net.Addr) (string, error) {
|
||||
host, _, err := net.SplitHostPort(addr.String())
|
||||
return host, err
|
||||
return u.PacketConn.WriteTo(b, addr)
|
||||
}
|
||||
|
||||
// GetSharedConn returns the shared udp conn
|
||||
@@ -225,6 +165,13 @@ func (m *UniversalUDPMuxDefault) HandleSTUNMessage(msg *stun.Message, addr net.A
|
||||
return nil
|
||||
}
|
||||
|
||||
src := udpAddr.AddrPort().Addr().Unmap()
|
||||
wg := m.params.WGAddress
|
||||
if (wg.Network.IsValid() && wg.Network.Contains(src)) || (wg.IPv6Net.IsValid() && wg.IPv6Net.Contains(src)) {
|
||||
log.Debugf("dropping STUN message from overlay source %s", udpAddr)
|
||||
return nil
|
||||
}
|
||||
|
||||
if m.isXORMappedResponse(msg, udpAddr.String()) {
|
||||
err := m.handleXORMappedResponse(udpAddr, msg)
|
||||
if err != nil {
|
||||
|
||||
@@ -66,7 +66,7 @@ func seedProxyForProxyCloseByRemoteConn() ([]proxyInstance, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
iceBind := bind.NewICEBind(nil, nil, wgAddress, 1280)
|
||||
iceBind := bind.NewICEBind(nil, wgAddress, 1280)
|
||||
endpointAddress := &net.UDPAddr{
|
||||
IP: net.IPv4(10, 0, 0, 1),
|
||||
Port: 1234,
|
||||
|
||||
@@ -22,7 +22,7 @@ func seedProxyForProxyCloseByRemoteConn() ([]proxyInstance, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
iceBind := bind.NewICEBind(nil, nil, wgAddress, 1280)
|
||||
iceBind := bind.NewICEBind(nil, wgAddress, 1280)
|
||||
endpointAddress := &net.UDPAddr{
|
||||
IP: net.IPv4(10, 0, 0, 1),
|
||||
Port: 1234,
|
||||
|
||||
@@ -51,20 +51,13 @@ type cachedRecord struct {
|
||||
}
|
||||
|
||||
// Resolver caches critical NetBird infrastructure domains.
|
||||
// records, refreshing, failedResolves, mgmtDomain and serverDomains are all
|
||||
// guarded by mutex.
|
||||
// records, refreshing, mgmtDomain and serverDomains are all guarded by mutex.
|
||||
type Resolver struct {
|
||||
records map[dns.Question]*cachedRecord
|
||||
mgmtDomain *domain.Domain
|
||||
serverDomains *dnsconfig.ServerDomains
|
||||
mutex sync.RWMutex
|
||||
|
||||
// failedResolves records the last failed initial resolve per domain so a
|
||||
// domain that never resolves isn't retried on every server-domains update
|
||||
// until refreshBackoff elapses. Entries are cleared on success and pruned
|
||||
// to the current server-domains set.
|
||||
failedResolves map[domain.Domain]time.Time
|
||||
|
||||
chain ChainResolver
|
||||
chainMaxPriority int
|
||||
refreshGroup singleflight.Group
|
||||
@@ -83,10 +76,9 @@ type Resolver struct {
|
||||
// NewResolver creates a new management domains cache resolver.
|
||||
func NewResolver() *Resolver {
|
||||
return &Resolver{
|
||||
records: make(map[dns.Question]*cachedRecord),
|
||||
refreshing: make(map[dns.Question]*atomic.Bool),
|
||||
failedResolves: make(map[domain.Domain]time.Time),
|
||||
cacheTTL: resolveCacheTTL(),
|
||||
records: make(map[dns.Question]*cachedRecord),
|
||||
refreshing: make(map[dns.Question]*atomic.Bool),
|
||||
cacheTTL: resolveCacheTTL(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,9 +173,7 @@ func (m *Resolver) continueToNext(w dns.ResponseWriter, r *dns.Msg) {
|
||||
|
||||
// AddDomain resolves a domain and stores its A/AAAA records in the cache.
|
||||
// A family that resolves NODATA (nil err, zero records) evicts any stale
|
||||
// entry for that qtype. When one family hard-errors while the other succeeds,
|
||||
// the resolved family is still cached but AddDomain returns an error so the
|
||||
// caller retries the incomplete resolve rather than treating it as complete.
|
||||
// entry for that qtype.
|
||||
func (m *Resolver) AddDomain(ctx context.Context, d domain.Domain) error {
|
||||
dnsName := strings.ToLower(dns.Fqdn(d.PunycodeString()))
|
||||
|
||||
@@ -213,10 +203,6 @@ func (m *Resolver) AddDomain(ctx context.Context, d domain.Domain) error {
|
||||
log.Debugf("added/updated domain=%s with %d A records and %d AAAA records",
|
||||
d.SafeString(), len(aRecords), len(aaaaRecords))
|
||||
|
||||
if errA != nil || errAAAA != nil {
|
||||
return fmt.Errorf("resolve %s: incomplete, a family failed: %w", d.SafeString(), errors.Join(errA, errAAAA))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -476,7 +462,6 @@ func (m *Resolver) RemoveDomain(d domain.Domain) error {
|
||||
delete(m.records, qAAAA)
|
||||
delete(m.refreshing, qA)
|
||||
delete(m.refreshing, qAAAA)
|
||||
delete(m.failedResolves, d)
|
||||
|
||||
log.Debugf("removed domain=%s from cache", d.SafeString())
|
||||
return nil
|
||||
@@ -520,7 +505,6 @@ func (m *Resolver) UpdateFromServerDomains(ctx context.Context, serverDomains dn
|
||||
allDomains := m.extractDomainsFromServerDomains(updatedServerDomains)
|
||||
currentDomains := m.GetCachedDomains()
|
||||
removedDomains = m.removeStaleDomains(currentDomains, allDomains)
|
||||
m.pruneFailedResolves(allDomains)
|
||||
}
|
||||
|
||||
m.addNewDomains(ctx, newDomains)
|
||||
@@ -593,85 +577,13 @@ func (m *Resolver) isManagementDomain(domain domain.Domain) bool {
|
||||
return m.mgmtDomain != nil && domain == *m.mgmtDomain
|
||||
}
|
||||
|
||||
// addNewDomains resolves and caches domains that are not yet in the cache,
|
||||
// running the lookups concurrently. Domains already cached are skipped and left
|
||||
// to the stale-while-revalidate refresh path, so a sync never re-resolves them
|
||||
// synchronously: once NetBird owns the OS resolver the resolve runs through the
|
||||
// handler chain and would otherwise dial the managed upstreams under the engine
|
||||
// sync lock on every update.
|
||||
// addNewDomains resolves and caches all domains from the update
|
||||
func (m *Resolver) addNewDomains(ctx context.Context, newDomains domain.List) {
|
||||
var wg sync.WaitGroup
|
||||
seen := make(map[domain.Domain]struct{}, len(newDomains))
|
||||
for _, newDomain := range newDomains {
|
||||
if _, dup := seen[newDomain]; dup {
|
||||
continue
|
||||
}
|
||||
seen[newDomain] = struct{}{}
|
||||
|
||||
if !m.needsResolve(newDomain) {
|
||||
continue
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(d domain.Domain) {
|
||||
defer wg.Done()
|
||||
if err := m.AddDomain(ctx, d); err != nil {
|
||||
m.markResolveFailed(d)
|
||||
log.Warnf("failed to add/update domain=%s: %v", d.SafeString(), err)
|
||||
return
|
||||
}
|
||||
m.clearResolveFailed(d)
|
||||
log.Debugf("added/updated management cache domain=%s", d.SafeString())
|
||||
}(newDomain)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// needsResolve reports whether d should be resolved now. A recent failed or
|
||||
// incomplete resolve gates retries on the backoff even when one family is
|
||||
// already cached, so a transiently-failed family is retried instead of being
|
||||
// treated as fully resolved. Otherwise a domain with any cached record is left
|
||||
// to the stale-while-revalidate refresh path.
|
||||
func (m *Resolver) needsResolve(d domain.Domain) bool {
|
||||
dnsName := strings.ToLower(dns.Fqdn(d.PunycodeString()))
|
||||
|
||||
m.mutex.RLock()
|
||||
defer m.mutex.RUnlock()
|
||||
|
||||
if failedAt, ok := m.failedResolves[d]; ok {
|
||||
return time.Since(failedAt) >= refreshBackoff
|
||||
}
|
||||
|
||||
for _, qtype := range []uint16{dns.TypeA, dns.TypeAAAA} {
|
||||
q := dns.Question{Name: dnsName, Qtype: qtype, Qclass: dns.ClassINET}
|
||||
if _, ok := m.records[q]; ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *Resolver) markResolveFailed(d domain.Domain) {
|
||||
m.mutex.Lock()
|
||||
m.failedResolves[d] = time.Now()
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (m *Resolver) clearResolveFailed(d domain.Domain) {
|
||||
m.mutex.Lock()
|
||||
delete(m.failedResolves, d)
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
|
||||
// pruneFailedResolves drops failure markers for domains no longer present in
|
||||
// the server-domains set, keeping the map bounded to the current set (a
|
||||
// failed-only domain has no cached record, so RemoveDomain never sees it).
|
||||
func (m *Resolver) pruneFailedResolves(domains domain.List) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
for d := range m.failedResolves {
|
||||
if !slices.Contains(domains, d) {
|
||||
delete(m.failedResolves, d)
|
||||
if err := m.AddDomain(ctx, newDomain); err != nil {
|
||||
log.Warnf("failed to add/update domain=%s: %v", newDomain.SafeString(), err)
|
||||
} else {
|
||||
log.Debugf("added/updated management cache domain=%s", newDomain.SafeString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ type fakeChain struct {
|
||||
mu sync.Mutex
|
||||
calls map[string]int
|
||||
answers map[string][]dns.RR
|
||||
qErr map[string]error
|
||||
err error
|
||||
hasRoot bool
|
||||
onLookup func()
|
||||
@@ -31,7 +30,6 @@ func newFakeChain() *fakeChain {
|
||||
return &fakeChain{
|
||||
calls: map[string]int{},
|
||||
answers: map[string][]dns.RR{},
|
||||
qErr: map[string]error{},
|
||||
hasRoot: true,
|
||||
}
|
||||
}
|
||||
@@ -49,9 +47,6 @@ func (f *fakeChain) ResolveInternal(ctx context.Context, msg *dns.Msg, maxPriori
|
||||
f.calls[key]++
|
||||
answers := f.answers[key]
|
||||
err := f.err
|
||||
if err == nil {
|
||||
err = f.qErr[key]
|
||||
}
|
||||
onLookup := f.onLookup
|
||||
f.mu.Unlock()
|
||||
|
||||
@@ -80,12 +75,6 @@ func (f *fakeChain) setAnswer(name string, qtype uint16, ip string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakeChain) setErr(name string, qtype uint16, err error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.qErr[name+"|"+dns.TypeToString[qtype]] = err
|
||||
}
|
||||
|
||||
func (f *fakeChain) callCount(name string, qtype uint16) int {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
@@ -1,183 +0,0 @@
|
||||
package mgmt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/miekg/dns"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
|
||||
"github.com/netbirdio/netbird/shared/management/domain"
|
||||
)
|
||||
|
||||
// A domain already in the cache must not be re-resolved on a subsequent server
|
||||
// domains update; it is left to the stale-while-revalidate refresh path.
|
||||
func TestResolver_UpdateFromServerDomains_SkipsCached(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("signal.example.com.", dns.TypeA, "10.0.0.2")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
sd := dnsconfig.ServerDomains{Signal: domain.Domain("signal.example.com")}
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"first update must resolve the domain")
|
||||
|
||||
_, err = r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"cached domain must not be re-resolved on a subsequent update")
|
||||
}
|
||||
|
||||
// New domains in a single update must resolve concurrently rather than serially.
|
||||
func TestResolver_AddNewDomains_ResolvesConcurrently(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
|
||||
var inflight, maxInflight atomic.Int32
|
||||
chain.onLookup = func() {
|
||||
n := inflight.Add(1)
|
||||
for {
|
||||
old := maxInflight.Load()
|
||||
if n <= old || maxInflight.CompareAndSwap(old, n) {
|
||||
break
|
||||
}
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
inflight.Add(-1)
|
||||
}
|
||||
|
||||
relays := []domain.Domain{"a.example.com", "b.example.com", "c.example.com", "d.example.com"}
|
||||
for _, d := range relays {
|
||||
chain.setAnswer(dns.Fqdn(string(d)), dns.TypeA, "10.0.0.2")
|
||||
}
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
start := time.Now()
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: relays})
|
||||
require.NoError(t, err)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
assert.GreaterOrEqual(t, int(maxInflight.Load()), 2, "domains must resolve concurrently")
|
||||
// Serial resolution of 4 domains would take at least 4*50ms; concurrent is far less.
|
||||
assert.Less(t, elapsed, 300*time.Millisecond, "resolution should not be serial")
|
||||
}
|
||||
|
||||
// A domain that fails to resolve must not be retried on every update; the
|
||||
// failure backoff suppresses re-resolution until it expires.
|
||||
func TestResolver_UpdateFromServerDomains_BacksOffFailures(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.err = errors.New("resolve boom")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
sd := dnsconfig.ServerDomains{Signal: domain.Domain("signal.example.com")}
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"first update must attempt the resolve")
|
||||
|
||||
_, err = r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"failed resolve must back off and not retry on the next update")
|
||||
}
|
||||
|
||||
// A domain listed under more than one server-domain type (e.g. STUN and TURN on
|
||||
// the same host) must be resolved once per update, not once per occurrence.
|
||||
func TestResolver_AddNewDomains_DedupesDuplicateDomains(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("dup.example.com.", dns.TypeA, "10.0.0.9")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
sd := dnsconfig.ServerDomains{
|
||||
Stuns: []domain.Domain{"dup.example.com"},
|
||||
Turns: []domain.Domain{"dup.example.com"},
|
||||
}
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, chain.callCount("dup.example.com.", dns.TypeA),
|
||||
"a domain appearing under multiple server-domain types must resolve once")
|
||||
}
|
||||
|
||||
// A failure marker must be dropped once its domain leaves the server-domains set
|
||||
// so the map stays bounded to the current set.
|
||||
func TestResolver_UpdateFromServerDomains_PrunesFailedResolves(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.err = errors.New("resolve boom")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Signal: domain.Domain("gone.example.com")})
|
||||
require.NoError(t, err)
|
||||
r.mutex.RLock()
|
||||
_, marked := r.failedResolves[domain.Domain("gone.example.com")]
|
||||
r.mutex.RUnlock()
|
||||
require.True(t, marked, "failed resolve must be recorded")
|
||||
|
||||
_, err = r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Signal: domain.Domain("other.example.com")})
|
||||
require.NoError(t, err)
|
||||
r.mutex.RLock()
|
||||
_, stillMarked := r.failedResolves[domain.Domain("gone.example.com")]
|
||||
r.mutex.RUnlock()
|
||||
assert.False(t, stillMarked, "failure marker for a domain no longer in the set must be pruned")
|
||||
}
|
||||
|
||||
// When one family hard-errors while the other resolves, the domain is cached
|
||||
// for the working family but recorded as incomplete so the failed family is
|
||||
// retried under backoff instead of being treated as fully resolved forever.
|
||||
func TestResolver_AddNewDomains_RetriesPartialFamilyFailure(t *testing.T) {
|
||||
d := domain.Domain("relay.example.com")
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("relay.example.com.", dns.TypeA, "10.0.0.2")
|
||||
chain.setErr("relay.example.com.", dns.TypeAAAA, errors.New("servfail"))
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: []domain.Domain{d}})
|
||||
require.NoError(t, err)
|
||||
|
||||
r.mutex.RLock()
|
||||
_, aCached := r.records[dns.Question{Name: "relay.example.com.", Qtype: dns.TypeA, Qclass: dns.ClassINET}]
|
||||
_, marked := r.failedResolves[d]
|
||||
r.mutex.RUnlock()
|
||||
require.True(t, aCached, "the working family must still be cached")
|
||||
require.True(t, marked, "a partial failure must be recorded so the failed family is retried")
|
||||
|
||||
assert.False(t, r.needsResolve(d), "within the backoff window the domain is not retried")
|
||||
|
||||
r.mutex.Lock()
|
||||
r.failedResolves[d] = time.Now().Add(-2 * refreshBackoff)
|
||||
r.mutex.Unlock()
|
||||
assert.True(t, r.needsResolve(d), "after the backoff elapses the domain is retried to pick up the missing family")
|
||||
}
|
||||
|
||||
// A family that returns NODATA (legitimately absent, e.g. an IPv4-only host) is
|
||||
// not a failure: the domain must not be marked for retry, otherwise it would be
|
||||
// re-resolved on every sync.
|
||||
func TestResolver_AddNewDomains_NodataIsNotFailure(t *testing.T) {
|
||||
d := domain.Domain("v4only.example.com")
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("v4only.example.com.", dns.TypeA, "10.0.0.2")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: []domain.Domain{d}})
|
||||
require.NoError(t, err)
|
||||
|
||||
r.mutex.RLock()
|
||||
_, marked := r.failedResolves[d]
|
||||
r.mutex.RUnlock()
|
||||
assert.False(t, marked, "a NODATA family must not be recorded as a failure")
|
||||
assert.False(t, r.needsResolve(d), "an IPv4-only host must not be re-resolved on later syncs")
|
||||
}
|
||||
@@ -14,7 +14,6 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
@@ -54,7 +53,6 @@ import (
|
||||
"github.com/netbirdio/netbird/client/internal/relay"
|
||||
"github.com/netbirdio/netbird/client/internal/rosenpass"
|
||||
"github.com/netbirdio/netbird/client/internal/routemanager"
|
||||
"github.com/netbirdio/netbird/client/internal/routemanager/systemops"
|
||||
"github.com/netbirdio/netbird/client/internal/statemanager"
|
||||
"github.com/netbirdio/netbird/client/internal/syncstore"
|
||||
"github.com/netbirdio/netbird/client/internal/updater"
|
||||
@@ -90,13 +88,6 @@ var ErrResetConnection = fmt.Errorf("reset connection")
|
||||
|
||||
var ErrEngineAlreadyStarted = errors.New("engine already started")
|
||||
|
||||
// engineRestartCount and engineLastRestart track client-restart cadence across
|
||||
// engine recreations so a restart loop is distinguishable from rare restarts.
|
||||
var (
|
||||
engineRestartCount atomic.Int64
|
||||
engineLastRestart atomic.Int64
|
||||
)
|
||||
|
||||
type EngineConfig struct {
|
||||
WgPort int
|
||||
WgIfaceName string
|
||||
@@ -908,11 +899,7 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
duration := time.Since(started)
|
||||
if update.GetNetworkMap() != nil {
|
||||
log.Infof("sync finished in %s, %d", duration, update.GetNetworkMap().GetSerial())
|
||||
} else {
|
||||
log.Infof("sync finished in %s", duration)
|
||||
}
|
||||
log.Infof("sync finished in %s", duration)
|
||||
e.clientMetrics.RecordSyncDuration(e.ctx, duration)
|
||||
}()
|
||||
e.syncMsgMux.Lock()
|
||||
@@ -922,23 +909,14 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
if e.ctx.Err() != nil {
|
||||
return e.ctx.Err()
|
||||
}
|
||||
serial := update.GetNetworkMap().GetSerial()
|
||||
if nm := update.GetNetworkMap(); nm != nil {
|
||||
log.Infof("sync update: serial=%d remotePeers=%d offlinePeers=%d routes=%d firewallRules=%d checks=%d configPresent=%v remotePeersEmpty=%v",
|
||||
nm.GetSerial(), len(nm.GetRemotePeers()), len(nm.GetOfflinePeers()), len(nm.GetRoutes()),
|
||||
len(nm.GetFirewallRules()), len(update.GetChecks()), update.GetNetbirdConfig() != nil, nm.GetRemotePeersIsEmpty())
|
||||
} else {
|
||||
log.Infof("sync update: config-only (no network map), configPresent=%v", update.GetNetbirdConfig() != nil)
|
||||
}
|
||||
|
||||
if update.NetworkMap != nil && update.NetworkMap.PeerConfig != nil {
|
||||
e.handleAutoUpdateVersion(update.NetworkMap.PeerConfig.AutoUpdate)
|
||||
}
|
||||
startTime := time.Now()
|
||||
|
||||
if err := e.updateNetbirdConfig(update.GetNetbirdConfig()); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("netbird config updated in %s, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
// Posture checks are bound to the network map presence:
|
||||
// NetworkMap != nil, checks present -> apply the received checks
|
||||
@@ -949,21 +927,17 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
if nm == nil {
|
||||
return nil
|
||||
}
|
||||
startTime = time.Now()
|
||||
|
||||
if err := e.updateChecksIfNew(update.Checks); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("checks updated in %s, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
startTime = time.Now()
|
||||
e.persistSyncResponse(update)
|
||||
log.Infof("sync response persisted in %s, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
// only apply new changes and ignore old ones
|
||||
startTime = time.Now()
|
||||
if err := e.updateNetworkMap(nm); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("network map updated in %s, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
e.statusRecorder.PublishEvent(cProto.SystemEvent_INFO, cProto.SystemEvent_SYSTEM, "Network map updated", "", nil)
|
||||
|
||||
@@ -1383,56 +1357,44 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
|
||||
dnsConfig := toDNSConfig(protoDNSConfig, e.wgInterface.Address())
|
||||
|
||||
startTime := time.Now()
|
||||
if err := e.dnsServer.UpdateDNSServer(serial, dnsConfig); err != nil {
|
||||
log.Errorf("failed to update dns server, err: %v", err)
|
||||
}
|
||||
log.Infof("updated dns server in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
e.routeManager.SetDNSForwarderPort(dnsConfig.ForwarderPort)
|
||||
|
||||
// apply routes first, route related actions might depend on routing being enabled
|
||||
startTime = time.Now()
|
||||
routes := toRoutes(networkMap.GetRoutes())
|
||||
serverRoutes, clientRoutes := e.routeManager.ClassifyRoutes(routes)
|
||||
log.Infof("updated routes in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
// lazy mgr needs to be aware of which routes are available before they are applied
|
||||
if e.connMgr != nil {
|
||||
e.connMgr.UpdateRouteHAMap(clientRoutes)
|
||||
log.Debugf("updated lazy connection manager with %d HA groups", len(clientRoutes))
|
||||
}
|
||||
|
||||
startTime = time.Now()
|
||||
dnsRouteFeatureFlag := toDNSFeatureFlag(networkMap)
|
||||
if err := e.routeManager.UpdateRoutes(serial, serverRoutes, clientRoutes, dnsRouteFeatureFlag); err != nil {
|
||||
log.Errorf("failed to update routes: %v", err)
|
||||
}
|
||||
log.Infof("updated routes in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
startTime = time.Now()
|
||||
if e.acl != nil {
|
||||
e.acl.ApplyFiltering(networkMap, dnsRouteFeatureFlag)
|
||||
}
|
||||
log.Infof("updated filtering in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
startTime = time.Now()
|
||||
fwdEntries := toRouteDomains(e.config.WgPrivateKey.PublicKey().String(), routes)
|
||||
e.updateDNSForwarder(dnsRouteFeatureFlag, fwdEntries)
|
||||
log.Infof("updated DNS forwarder in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
startTime = time.Now()
|
||||
// Ingress forward rules
|
||||
forwardingRules, err := e.updateForwardRules(networkMap.GetForwardingRules())
|
||||
if err != nil {
|
||||
log.Errorf("failed to update forward rules, err: %v", err)
|
||||
}
|
||||
log.Infof("updated forward rules in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))
|
||||
|
||||
startTime = time.Now()
|
||||
e.updateOfflinePeers(networkMap.GetOfflinePeers())
|
||||
log.Infof("updated offline peers in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
// Filter out own peer from the remote peers list
|
||||
localPubKey := e.config.WgPrivateKey.PublicKey().String()
|
||||
remotePeers := make([]*mgmProto.RemotePeerConfig, 0, len(networkMap.GetRemotePeers()))
|
||||
@@ -1450,24 +1412,20 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
startTime = time.Now()
|
||||
err := e.removePeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("removed peers in %v, serial=%d", time.Since(startTime), serial)
|
||||
startTime = time.Now()
|
||||
|
||||
err = e.modifyPeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("modified peers in %v, serial=%d", time.Since(startTime), serial)
|
||||
startTime = time.Now()
|
||||
|
||||
err = e.addNewPeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("added peers in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
e.statusRecorder.FinishPeerListModifications()
|
||||
|
||||
@@ -1480,11 +1438,9 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
e.updateSSHServerAuth(networkMap.GetSshAuth())
|
||||
}
|
||||
|
||||
startTime = time.Now()
|
||||
// must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store
|
||||
excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, remotePeers)
|
||||
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
|
||||
log.Infof("updated lazy connection manager exclude list in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
e.networkSerial = serial
|
||||
|
||||
@@ -1999,7 +1955,6 @@ func (e *Engine) newWgIface() (*iface.WGIface, error) {
|
||||
WGPrivKey: e.config.WgPrivateKey.String(),
|
||||
MTU: e.config.MTU,
|
||||
TransportNet: transportNet,
|
||||
FilterFn: e.addrViaRoutes,
|
||||
DisableDNS: e.config.DisableDNS,
|
||||
}
|
||||
|
||||
@@ -2216,14 +2171,7 @@ func (e *Engine) triggerClientRestart() {
|
||||
return
|
||||
}
|
||||
|
||||
// Cadence survives engine recreation (package-level), so a restart loop shows
|
||||
// as a fast-climbing count with a short gap, distinct from rare intentional restarts.
|
||||
n := engineRestartCount.Add(1)
|
||||
var sinceLast time.Duration
|
||||
if prev := engineLastRestart.Swap(time.Now().UnixNano()); prev != 0 {
|
||||
sinceLast = time.Since(time.Unix(0, prev))
|
||||
}
|
||||
log.Infof("restarting engine (restart #%d, %s since previous)", n, sinceLast.Round(time.Second))
|
||||
log.Info("restarting engine")
|
||||
CtxGetState(e.ctx).Set(StatusConnecting)
|
||||
_ = CtxGetState(e.ctx).Wrap(ErrResetConnection)
|
||||
log.Infof("cancelling client context, engine will be recreated")
|
||||
@@ -2254,21 +2202,6 @@ func (e *Engine) startNetworkMonitor() {
|
||||
}()
|
||||
}
|
||||
|
||||
func (e *Engine) addrViaRoutes(addr netip.Addr) (bool, netip.Prefix, error) {
|
||||
var vpnRoutes []netip.Prefix
|
||||
for _, routes := range e.routeManager.GetClientRoutes() {
|
||||
if len(routes) > 0 && routes[0] != nil {
|
||||
vpnRoutes = append(vpnRoutes, routes[0].Network)
|
||||
}
|
||||
}
|
||||
|
||||
if isVpn, prefix := systemops.IsAddrRouted(addr, vpnRoutes); isVpn {
|
||||
return true, prefix, nil
|
||||
}
|
||||
|
||||
return false, netip.Prefix{}, nil
|
||||
}
|
||||
|
||||
func (e *Engine) stopDNSServer() {
|
||||
if e.dnsServer == nil {
|
||||
return
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"runtime/debug"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -193,7 +192,6 @@ func (s *StatusChangeSubscription) Events() chan map[string]RouterState {
|
||||
// Pure read methods take RLock; anything that mutates state takes Lock.
|
||||
type Status struct {
|
||||
mux sync.RWMutex
|
||||
muxRelays sync.RWMutex
|
||||
peers map[string]State
|
||||
ipToKey map[string]string
|
||||
changeNotify map[string]map[string]*StatusChangeSubscription // map[peerID]map[subscriptionID]*StatusChangeSubscription
|
||||
@@ -228,8 +226,6 @@ type Status struct {
|
||||
|
||||
routeIDLookup routeIDLookup
|
||||
wgIface WGIfaceStatus
|
||||
|
||||
profile *StatusProfile
|
||||
}
|
||||
|
||||
// NewRecorder returns a new Status instance
|
||||
@@ -244,23 +240,16 @@ func NewRecorder(mgmAddress string) *Status {
|
||||
notifier: newNotifier(),
|
||||
mgmAddress: mgmAddress,
|
||||
resolvedDomainsStates: map[domain.Domain]ResolvedDomainInfo{},
|
||||
profile: NewStatusProfile(context.Background()),
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Status) StartProfile(ctx context.Context) {
|
||||
d.profile.Start(ctx)
|
||||
}
|
||||
|
||||
func (d *Status) SetRelayMgr(manager *relayClient.Manager) {
|
||||
d.profile.inc("SetRelayMgr")
|
||||
d.muxRelays.Lock()
|
||||
defer d.muxRelays.Unlock()
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.relayMgr = manager
|
||||
}
|
||||
|
||||
func (d *Status) SetIngressGwMgr(ingressGwMgr *ingressgw.Manager) {
|
||||
d.profile.inc("SetIngressGwMgr")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.ingressGwMgr = ingressGwMgr
|
||||
@@ -268,7 +257,6 @@ func (d *Status) SetIngressGwMgr(ingressGwMgr *ingressgw.Manager) {
|
||||
|
||||
// ReplaceOfflinePeers replaces
|
||||
func (d *Status) ReplaceOfflinePeers(replacement []State) {
|
||||
d.profile.inc("ReplaceOfflinePeers")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.offlinePeers = make([]State, len(replacement))
|
||||
@@ -280,7 +268,6 @@ func (d *Status) ReplaceOfflinePeers(replacement []State) {
|
||||
|
||||
// AddPeer adds peer to Daemon status map
|
||||
func (d *Status) AddPeer(peerPubKey string, fqdn string, ip string, ipv6 string) error {
|
||||
d.profile.inc("AddPeer")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -308,7 +295,6 @@ func (d *Status) AddPeer(peerPubKey string, fqdn string, ip string, ipv6 string)
|
||||
|
||||
// GetPeer adds peer to Daemon status map
|
||||
func (d *Status) GetPeer(peerPubKey string) (State, error) {
|
||||
d.profile.inc("GetPeer")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
|
||||
@@ -320,7 +306,6 @@ func (d *Status) GetPeer(peerPubKey string) (State, error) {
|
||||
}
|
||||
|
||||
func (d *Status) PeerByIP(ip string) (string, bool) {
|
||||
d.profile.inc("PeerByIP")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
|
||||
@@ -338,7 +323,6 @@ func (d *Status) PeerByIP(ip string) (string, bool) {
|
||||
// active peers are matched; peers moved into the offline slice by
|
||||
// ReplaceOfflinePeers are intentionally treated as unknown.
|
||||
func (d *Status) PeerStateByIP(ip string) (State, bool) {
|
||||
d.profile.inc("PeerStateByIP")
|
||||
if ip == "" {
|
||||
return State{}, false
|
||||
}
|
||||
@@ -357,7 +341,6 @@ func (d *Status) PeerStateByIP(ip string) (State, bool) {
|
||||
|
||||
// RemovePeer removes peer from Daemon status map
|
||||
func (d *Status) RemovePeer(peerPubKey string) error {
|
||||
d.profile.inc("RemovePeer")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -379,7 +362,6 @@ func (d *Status) RemovePeer(peerPubKey string) error {
|
||||
|
||||
// UpdatePeerState updates peer status
|
||||
func (d *Status) UpdatePeerState(receivedState State) error {
|
||||
d.profile.inc("UpdatePeerState")
|
||||
d.mux.Lock()
|
||||
|
||||
peerState, ok := d.peers[receivedState.PubKey]
|
||||
@@ -422,7 +404,6 @@ func (d *Status) UpdatePeerState(receivedState State) error {
|
||||
}
|
||||
|
||||
func (d *Status) AddPeerStateRoute(peer string, route string, resourceId route.ResID) error {
|
||||
d.profile.inc("AddPeerStateRoute")
|
||||
d.mux.Lock()
|
||||
|
||||
peerState, ok := d.peers[peer]
|
||||
@@ -448,7 +429,6 @@ func (d *Status) AddPeerStateRoute(peer string, route string, resourceId route.R
|
||||
}
|
||||
|
||||
func (d *Status) RemovePeerStateRoute(peer string, route string) error {
|
||||
d.profile.inc("RemovePeerStateRoute")
|
||||
d.mux.Lock()
|
||||
|
||||
peerState, ok := d.peers[peer]
|
||||
@@ -479,13 +459,11 @@ func (d *Status) CheckRoutes(ip netip.Addr) ([]byte, bool) {
|
||||
if d == nil {
|
||||
return nil, false
|
||||
}
|
||||
d.profile.inc("CheckRoutes")
|
||||
resId, isExitNode := d.routeIDLookup.Lookup(ip)
|
||||
return []byte(resId), isExitNode
|
||||
}
|
||||
|
||||
func (d *Status) UpdatePeerICEState(receivedState State) error {
|
||||
d.profile.inc("UpdatePeerICEState")
|
||||
d.mux.Lock()
|
||||
|
||||
peerState, ok := d.peers[receivedState.PubKey]
|
||||
@@ -525,7 +503,6 @@ func (d *Status) UpdatePeerICEState(receivedState State) error {
|
||||
}
|
||||
|
||||
func (d *Status) UpdatePeerRelayedState(receivedState State) error {
|
||||
d.profile.inc("UpdatePeerRelayedState")
|
||||
d.mux.Lock()
|
||||
|
||||
peerState, ok := d.peers[receivedState.PubKey]
|
||||
@@ -562,7 +539,6 @@ func (d *Status) UpdatePeerRelayedState(receivedState State) error {
|
||||
}
|
||||
|
||||
func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error {
|
||||
d.profile.inc("UpdatePeerRelayedStateToDisconnected")
|
||||
d.mux.Lock()
|
||||
|
||||
peerState, ok := d.peers[receivedState.PubKey]
|
||||
@@ -598,7 +574,6 @@ func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error
|
||||
}
|
||||
|
||||
func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error {
|
||||
d.profile.inc("UpdatePeerICEStateToDisconnected")
|
||||
d.mux.Lock()
|
||||
|
||||
peerState, ok := d.peers[receivedState.PubKey]
|
||||
@@ -638,7 +613,6 @@ func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error {
|
||||
|
||||
// UpdateWireGuardPeerState updates the WireGuard bits of the peer state
|
||||
func (d *Status) UpdateWireGuardPeerState(pubKey string, wgStats configurer.WGStats) error {
|
||||
d.profile.inc("UpdateWireGuardPeerState")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -666,7 +640,6 @@ func hasConnStatusChanged(oldStatus, newStatus ConnStatus) bool {
|
||||
|
||||
// UpdatePeerFQDN update peer's state fqdn only
|
||||
func (d *Status) UpdatePeerFQDN(peerPubKey, fqdn string) error {
|
||||
d.profile.inc("UpdatePeerFQDN")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -683,7 +656,6 @@ func (d *Status) UpdatePeerFQDN(peerPubKey, fqdn string) error {
|
||||
|
||||
// UpdatePeerSSHHostKey updates peer's SSH host key
|
||||
func (d *Status) UpdatePeerSSHHostKey(peerPubKey string, sshHostKey []byte) error {
|
||||
d.profile.inc("UpdatePeerSSHHostKey")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -700,7 +672,6 @@ func (d *Status) UpdatePeerSSHHostKey(peerPubKey string, sshHostKey []byte) erro
|
||||
|
||||
// FinishPeerListModifications this event invoke the notification
|
||||
func (d *Status) FinishPeerListModifications() {
|
||||
d.profile.inc("FinishPeerListModifications")
|
||||
d.mux.Lock()
|
||||
|
||||
if !d.peerListChangedForNotification {
|
||||
@@ -733,7 +704,6 @@ func (d *Status) FinishPeerListModifications() {
|
||||
}
|
||||
|
||||
func (d *Status) SubscribeToPeerStateChanges(ctx context.Context, peerID string) *StatusChangeSubscription {
|
||||
d.profile.inc("SubscribeToPeerStateChanges")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -747,7 +717,6 @@ func (d *Status) SubscribeToPeerStateChanges(ctx context.Context, peerID string)
|
||||
}
|
||||
|
||||
func (d *Status) UnsubscribePeerStateChanges(subscription *StatusChangeSubscription) {
|
||||
d.profile.inc("UnsubscribePeerStateChanges")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -773,7 +742,6 @@ func (d *Status) UnsubscribePeerStateChanges(subscription *StatusChangeSubscript
|
||||
|
||||
// GetLocalPeerState returns the local peer state
|
||||
func (d *Status) GetLocalPeerState() LocalPeerState {
|
||||
d.profile.inc("GetLocalPeerState")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
return d.localPeer.Clone()
|
||||
@@ -781,7 +749,6 @@ func (d *Status) GetLocalPeerState() LocalPeerState {
|
||||
|
||||
// UpdateLocalPeerState updates local peer status
|
||||
func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) {
|
||||
d.profile.inc("UpdateLocalPeerState")
|
||||
d.mux.Lock()
|
||||
d.localPeer = localPeerState
|
||||
fqdn := d.localPeer.FQDN
|
||||
@@ -796,7 +763,6 @@ func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) {
|
||||
|
||||
// AddLocalPeerStateRoute adds a route to the local peer state
|
||||
func (d *Status) AddLocalPeerStateRoute(route string, resourceId route.ResID) {
|
||||
d.profile.inc("AddLocalPeerStateRoute")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -814,7 +780,6 @@ func (d *Status) AddLocalPeerStateRoute(route string, resourceId route.ResID) {
|
||||
|
||||
// RemoveLocalPeerStateRoute removes a route from the local peer state
|
||||
func (d *Status) RemoveLocalPeerStateRoute(route string) {
|
||||
d.profile.inc("RemoveLocalPeerStateRoute")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -828,7 +793,6 @@ func (d *Status) RemoveLocalPeerStateRoute(route string) {
|
||||
|
||||
// AddResolvedIPLookupEntry adds a resolved IP lookup entry
|
||||
func (d *Status) AddResolvedIPLookupEntry(prefix netip.Prefix, resourceId route.ResID) {
|
||||
d.profile.inc("AddResolvedIPLookupEntry")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -837,7 +801,6 @@ func (d *Status) AddResolvedIPLookupEntry(prefix netip.Prefix, resourceId route.
|
||||
|
||||
// RemoveResolvedIPLookupEntry removes a resolved IP lookup entry
|
||||
func (d *Status) RemoveResolvedIPLookupEntry(route string) {
|
||||
d.profile.inc("RemoveResolvedIPLookupEntry")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -849,7 +812,6 @@ func (d *Status) RemoveResolvedIPLookupEntry(route string) {
|
||||
|
||||
// CleanLocalPeerStateRoutes cleans all routes from the local peer state
|
||||
func (d *Status) CleanLocalPeerStateRoutes() {
|
||||
d.profile.inc("CleanLocalPeerStateRoutes")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -858,7 +820,6 @@ func (d *Status) CleanLocalPeerStateRoutes() {
|
||||
|
||||
// CleanLocalPeerState cleans local peer status
|
||||
func (d *Status) CleanLocalPeerState() {
|
||||
d.profile.inc("CleanLocalPeerState")
|
||||
d.mux.Lock()
|
||||
d.localPeer = LocalPeerState{}
|
||||
fqdn := d.localPeer.FQDN
|
||||
@@ -870,7 +831,6 @@ func (d *Status) CleanLocalPeerState() {
|
||||
|
||||
// MarkManagementDisconnected sets ManagementState to disconnected
|
||||
func (d *Status) MarkManagementDisconnected(err error) {
|
||||
d.profile.inc("MarkManagementDisconnected")
|
||||
d.mux.Lock()
|
||||
d.managementState = false
|
||||
d.managementError = err
|
||||
@@ -883,7 +843,6 @@ func (d *Status) MarkManagementDisconnected(err error) {
|
||||
|
||||
// MarkManagementConnected sets ManagementState to connected
|
||||
func (d *Status) MarkManagementConnected() {
|
||||
d.profile.inc("MarkManagementConnected")
|
||||
d.mux.Lock()
|
||||
d.managementState = true
|
||||
d.managementError = nil
|
||||
@@ -896,7 +855,6 @@ func (d *Status) MarkManagementConnected() {
|
||||
|
||||
// UpdateSignalAddress update the address of the signal server
|
||||
func (d *Status) UpdateSignalAddress(signalURL string) {
|
||||
d.profile.inc("UpdateSignalAddress")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.signalAddress = signalURL
|
||||
@@ -904,7 +862,6 @@ func (d *Status) UpdateSignalAddress(signalURL string) {
|
||||
|
||||
// UpdateManagementAddress update the address of the management server
|
||||
func (d *Status) UpdateManagementAddress(mgmAddress string) {
|
||||
d.profile.inc("UpdateManagementAddress")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.mgmAddress = mgmAddress
|
||||
@@ -912,7 +869,6 @@ func (d *Status) UpdateManagementAddress(mgmAddress string) {
|
||||
|
||||
// UpdateRosenpass update the Rosenpass configuration
|
||||
func (d *Status) UpdateRosenpass(rosenpassEnabled, rosenpassPermissive bool) {
|
||||
d.profile.inc("UpdateRosenpass")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.rosenpassPermissive = rosenpassPermissive
|
||||
@@ -920,7 +876,6 @@ func (d *Status) UpdateRosenpass(rosenpassEnabled, rosenpassPermissive bool) {
|
||||
}
|
||||
|
||||
func (d *Status) UpdateLazyConnection(enabled bool) {
|
||||
d.profile.inc("UpdateLazyConnection")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.lazyConnectionEnabled = enabled
|
||||
@@ -928,7 +883,6 @@ func (d *Status) UpdateLazyConnection(enabled bool) {
|
||||
|
||||
// MarkSignalDisconnected sets SignalState to disconnected
|
||||
func (d *Status) MarkSignalDisconnected(err error) {
|
||||
d.profile.inc("MarkSignalDisconnected")
|
||||
d.mux.Lock()
|
||||
d.signalState = false
|
||||
d.signalError = err
|
||||
@@ -941,7 +895,6 @@ func (d *Status) MarkSignalDisconnected(err error) {
|
||||
|
||||
// MarkSignalConnected sets SignalState to connected
|
||||
func (d *Status) MarkSignalConnected() {
|
||||
d.profile.inc("MarkSignalConnected")
|
||||
d.mux.Lock()
|
||||
d.signalState = true
|
||||
d.signalError = nil
|
||||
@@ -953,21 +906,18 @@ func (d *Status) MarkSignalConnected() {
|
||||
}
|
||||
|
||||
func (d *Status) UpdateRelayStates(relayResults []relay.ProbeResult) {
|
||||
d.profile.inc("UpdateRelayStates")
|
||||
d.muxRelays.Lock()
|
||||
defer d.muxRelays.Unlock()
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.relayStates = relayResults
|
||||
}
|
||||
|
||||
func (d *Status) UpdateDNSStates(dnsStates []NSGroupState) {
|
||||
d.profile.inc("UpdateDNSStates")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
d.nsGroupStates = dnsStates
|
||||
}
|
||||
|
||||
func (d *Status) UpdateResolvedDomainsStates(originalDomain domain.Domain, resolvedDomain domain.Domain, prefixes []netip.Prefix, resourceId route.ResID) {
|
||||
d.profile.inc("UpdateResolvedDomainsStates")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -983,7 +933,6 @@ func (d *Status) UpdateResolvedDomainsStates(originalDomain domain.Domain, resol
|
||||
}
|
||||
|
||||
func (d *Status) DeleteResolvedDomainsStates(domain domain.Domain) {
|
||||
d.profile.inc("DeleteResolvedDomainsStates")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -1000,7 +949,6 @@ func (d *Status) DeleteResolvedDomainsStates(domain domain.Domain) {
|
||||
}
|
||||
|
||||
func (d *Status) GetRosenpassState() RosenpassState {
|
||||
d.profile.inc("GetRosenpassState")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
return RosenpassState{
|
||||
@@ -1010,14 +958,12 @@ func (d *Status) GetRosenpassState() RosenpassState {
|
||||
}
|
||||
|
||||
func (d *Status) GetLazyConnection() bool {
|
||||
d.profile.inc("GetLazyConnection")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
return d.lazyConnectionEnabled
|
||||
}
|
||||
|
||||
func (d *Status) GetManagementState() ManagementState {
|
||||
d.profile.inc("GetManagementState")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
return ManagementState{
|
||||
@@ -1028,7 +974,6 @@ func (d *Status) GetManagementState() ManagementState {
|
||||
}
|
||||
|
||||
func (d *Status) UpdateLatency(pubKey string, latency time.Duration) error {
|
||||
d.profile.inc("UpdateLatency")
|
||||
if latency <= 0 {
|
||||
return nil
|
||||
}
|
||||
@@ -1046,7 +991,6 @@ func (d *Status) UpdateLatency(pubKey string, latency time.Duration) error {
|
||||
|
||||
// IsLoginRequired determines if a peer's login has expired.
|
||||
func (d *Status) IsLoginRequired() bool {
|
||||
d.profile.inc("IsLoginRequired")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
|
||||
@@ -1063,7 +1007,6 @@ func (d *Status) IsLoginRequired() bool {
|
||||
}
|
||||
|
||||
func (d *Status) GetSignalState() SignalState {
|
||||
d.profile.inc("GetSignalState")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
return SignalState{
|
||||
@@ -1075,34 +1018,24 @@ func (d *Status) GetSignalState() SignalState {
|
||||
|
||||
// GetRelayStates returns the stun/turn/permanent relay states
|
||||
func (d *Status) GetRelayStates() []relay.ProbeResult {
|
||||
d.profile.inc("GetRelayStates")
|
||||
d.muxRelays.RLock()
|
||||
|
||||
// debug lines
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
debugElapsed("GetRelayStates", started)
|
||||
}()
|
||||
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
if d.relayMgr == nil {
|
||||
defer d.muxRelays.RUnlock()
|
||||
return d.relayStates
|
||||
}
|
||||
|
||||
relayMgr := d.relayMgr
|
||||
// extend the list of stun, turn servers with the relay server connections
|
||||
relayStates := slices.Clone(d.relayStates)
|
||||
d.muxRelays.RUnlock()
|
||||
|
||||
states := relayMgr.RelayStates()
|
||||
states := d.relayMgr.RelayStates()
|
||||
if len(states) == 0 {
|
||||
// no relay connection tracked yet; surface configured servers as
|
||||
// unavailable with the real reconnect error when known
|
||||
err := relayClient.ErrRelayClientNotConnected
|
||||
if connErr := relayMgr.RelayConnectError(); connErr != nil {
|
||||
if connErr := d.relayMgr.RelayConnectError(); connErr != nil {
|
||||
err = connErr
|
||||
}
|
||||
for _, r := range relayMgr.ServerURLs() {
|
||||
for _, r := range d.relayMgr.ServerURLs() {
|
||||
relayStates = append(relayStates, relay.ProbeResult{
|
||||
URI: r,
|
||||
Err: err,
|
||||
@@ -1122,15 +1055,7 @@ func (d *Status) GetRelayStates() []relay.ProbeResult {
|
||||
}
|
||||
|
||||
func (d *Status) ForwardingRules() []firewall.ForwardRule {
|
||||
d.profile.inc("ForwardingRules")
|
||||
d.mux.RLock()
|
||||
|
||||
// debug lines
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
debugElapsed("ForwardingRules", started)
|
||||
}()
|
||||
|
||||
defer d.mux.RUnlock()
|
||||
if d.ingressGwMgr == nil {
|
||||
return nil
|
||||
@@ -1140,7 +1065,6 @@ func (d *Status) ForwardingRules() []firewall.ForwardRule {
|
||||
}
|
||||
|
||||
func (d *Status) GetDNSStates() []NSGroupState {
|
||||
d.profile.inc("GetDNSStates")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
|
||||
@@ -1149,7 +1073,6 @@ func (d *Status) GetDNSStates() []NSGroupState {
|
||||
}
|
||||
|
||||
func (d *Status) GetResolvedDomainsStates() map[domain.Domain]ResolvedDomainInfo {
|
||||
d.profile.inc("GetResolvedDomainsStates")
|
||||
d.mux.RLock()
|
||||
defer d.mux.RUnlock()
|
||||
return maps.Clone(d.resolvedDomainsStates)
|
||||
@@ -1157,7 +1080,6 @@ func (d *Status) GetResolvedDomainsStates() map[domain.Domain]ResolvedDomainInfo
|
||||
|
||||
// GetFullStatus gets full status
|
||||
func (d *Status) GetFullStatus() FullStatus {
|
||||
d.profile.inc("GetFullStatus")
|
||||
fullStatus := FullStatus{
|
||||
ManagementState: d.GetManagementState(),
|
||||
SignalState: d.GetSignalState(),
|
||||
@@ -1184,31 +1106,26 @@ func (d *Status) GetFullStatus() FullStatus {
|
||||
|
||||
// ClientStart will notify all listeners about the new service state
|
||||
func (d *Status) ClientStart() {
|
||||
d.profile.inc("ClientStart")
|
||||
d.notifier.clientStart()
|
||||
}
|
||||
|
||||
// ClientStop will notify all listeners about the new service state
|
||||
func (d *Status) ClientStop() {
|
||||
d.profile.inc("ClientStop")
|
||||
d.notifier.clientStop()
|
||||
}
|
||||
|
||||
// ClientTeardown will notify all listeners about the service is under teardown
|
||||
func (d *Status) ClientTeardown() {
|
||||
d.profile.inc("ClientTeardown")
|
||||
d.notifier.clientTearDown()
|
||||
}
|
||||
|
||||
// SetConnectionListener set a listener to the notifier
|
||||
func (d *Status) SetConnectionListener(listener Listener) {
|
||||
d.profile.inc("SetConnectionListener")
|
||||
d.notifier.setListener(listener)
|
||||
}
|
||||
|
||||
// RemoveConnectionListener remove the listener from the notifier
|
||||
func (d *Status) RemoveConnectionListener() {
|
||||
d.profile.inc("RemoveConnectionListener")
|
||||
d.notifier.removeListener()
|
||||
}
|
||||
|
||||
@@ -1280,7 +1197,6 @@ func (d *Status) PublishEvent(
|
||||
userMsg string,
|
||||
metadata map[string]string,
|
||||
) {
|
||||
d.profile.inc("PublishEvent")
|
||||
event := &proto.SystemEvent{
|
||||
Id: uuid.New().String(),
|
||||
Severity: severity,
|
||||
@@ -1292,13 +1208,6 @@ func (d *Status) PublishEvent(
|
||||
}
|
||||
|
||||
d.eventMux.Lock()
|
||||
|
||||
// debug lines
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
debugElapsed("PublishEvent", started)
|
||||
}()
|
||||
|
||||
defer d.eventMux.Unlock()
|
||||
|
||||
d.eventQueue.Add(event)
|
||||
@@ -1316,7 +1225,6 @@ func (d *Status) PublishEvent(
|
||||
|
||||
// SubscribeToEvents returns a new event subscription
|
||||
func (d *Status) SubscribeToEvents() *EventSubscription {
|
||||
d.profile.inc("SubscribeToEvents")
|
||||
d.eventMux.Lock()
|
||||
defer d.eventMux.Unlock()
|
||||
|
||||
@@ -1332,7 +1240,6 @@ func (d *Status) SubscribeToEvents() *EventSubscription {
|
||||
|
||||
// UnsubscribeFromEvents removes an event subscription
|
||||
func (d *Status) UnsubscribeFromEvents(sub *EventSubscription) {
|
||||
d.profile.inc("UnsubscribeFromEvents")
|
||||
if sub == nil {
|
||||
return
|
||||
}
|
||||
@@ -1348,12 +1255,10 @@ func (d *Status) UnsubscribeFromEvents(sub *EventSubscription) {
|
||||
|
||||
// GetEventHistory returns all events in the queue
|
||||
func (d *Status) GetEventHistory() []*proto.SystemEvent {
|
||||
d.profile.inc("GetEventHistory")
|
||||
return d.eventQueue.GetAll()
|
||||
}
|
||||
|
||||
func (d *Status) SetWgIface(wgInterface WGIfaceStatus) {
|
||||
d.profile.inc("SetWgIface")
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -1361,15 +1266,7 @@ func (d *Status) SetWgIface(wgInterface WGIfaceStatus) {
|
||||
}
|
||||
|
||||
func (d *Status) PeersStatus() (*configurer.Stats, error) {
|
||||
d.profile.inc("PeersStatus")
|
||||
d.mux.RLock()
|
||||
|
||||
// debug lines
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
debugElapsed("PeersStatus", started)
|
||||
}()
|
||||
|
||||
defer d.mux.RUnlock()
|
||||
if d.wgIface == nil {
|
||||
return nil, fmt.Errorf("wgInterface is nil, cannot retrieve peers status")
|
||||
@@ -1382,15 +1279,7 @@ func (d *Status) PeersStatus() (*configurer.Stats, error) {
|
||||
// and updates the cached peer states. This ensures accurate handshake times and
|
||||
// transfer statistics in status reports without running full health probes.
|
||||
func (d *Status) RefreshWireGuardStats() error {
|
||||
d.profile.inc("RefreshWireGuardStats")
|
||||
d.mux.Lock()
|
||||
|
||||
// debug lines
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
debugElapsed("RefreshWireGuardStats", started)
|
||||
}()
|
||||
|
||||
defer d.mux.Unlock()
|
||||
|
||||
if d.wgIface == nil {
|
||||
@@ -1555,10 +1444,3 @@ func (fs FullStatus) ToProto() *proto.FullStatus {
|
||||
|
||||
return &pbFullStatus
|
||||
}
|
||||
|
||||
func debugElapsed(msg string, startTime time.Time) {
|
||||
if elapsed := time.Since(startTime); elapsed > 1*time.Second {
|
||||
log.Infof("run %s took %s", msg, elapsed)
|
||||
debug.PrintStack()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,97 +0,0 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type StatusProfile struct {
|
||||
counts sync.Map
|
||||
}
|
||||
|
||||
func NewStatusProfile(ctx context.Context) *StatusProfile {
|
||||
s := &StatusProfile{}
|
||||
go s.Start(ctx)
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *StatusProfile) Start(ctx context.Context) {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.logCounts()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *StatusProfile) inc(method string) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
if v, ok := s.counts.Load(method); ok {
|
||||
v.(*atomic.Int64).Add(1)
|
||||
return
|
||||
}
|
||||
cnt := &atomic.Int64{}
|
||||
actual, _ := s.counts.LoadOrStore(method, cnt)
|
||||
actual.(*atomic.Int64).Add(1)
|
||||
}
|
||||
|
||||
func (s *StatusProfile) snapshot() map[string]int64 {
|
||||
out := make(map[string]int64)
|
||||
s.counts.Range(func(k, v any) bool {
|
||||
out[k.(string)] = v.(*atomic.Int64).Load()
|
||||
return true
|
||||
})
|
||||
return out
|
||||
}
|
||||
|
||||
func (s *StatusProfile) logCounts() {
|
||||
counts := s.snapshot()
|
||||
if len(counts) == 0 {
|
||||
log.Infof("status profile: no Status method calls so far")
|
||||
return
|
||||
}
|
||||
|
||||
type kv struct {
|
||||
method string
|
||||
count int64
|
||||
}
|
||||
sorted := make([]kv, 0, len(counts))
|
||||
var total int64
|
||||
for m, c := range counts {
|
||||
if c == 0 {
|
||||
continue
|
||||
}
|
||||
sorted = append(sorted, kv{m, c})
|
||||
total += c
|
||||
}
|
||||
sort.Slice(sorted, func(i, j int) bool {
|
||||
if sorted[i].count != sorted[j].count {
|
||||
return sorted[i].count > sorted[j].count
|
||||
}
|
||||
return sorted[i].method < sorted[j].method
|
||||
})
|
||||
|
||||
var b strings.Builder
|
||||
for i, e := range sorted {
|
||||
if i > 0 {
|
||||
b.WriteString(", ")
|
||||
}
|
||||
b.WriteString(e.method)
|
||||
b.WriteByte('=')
|
||||
b.WriteString(strconv.FormatInt(e.count, 10))
|
||||
}
|
||||
log.Infof("status profile (cumulative total=%d): %s", total, b.String())
|
||||
}
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -165,10 +164,6 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA
|
||||
return
|
||||
}
|
||||
|
||||
if candidateViaRoutes(candidate, haRoutes) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := w.agent.AddRemoteCandidate(candidate); err != nil {
|
||||
w.log.Errorf("error while handling remote candidate")
|
||||
return
|
||||
@@ -466,7 +461,7 @@ func (w *WorkerICE) createForwardedCandidate(srflxCandidate ice.Candidate, mappi
|
||||
}
|
||||
|
||||
func (w *WorkerICE) onICESelectedCandidatePair(agent *icemaker.ThreadSafeAgent, c1, c2 ice.Candidate) {
|
||||
w.log.Infof("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
|
||||
w.log.Debugf("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
|
||||
w.config.Key)
|
||||
|
||||
pairStat, ok := agent.GetSelectedCandidatePairStats()
|
||||
@@ -589,34 +584,6 @@ func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive
|
||||
return ec, nil
|
||||
}
|
||||
|
||||
func candidateViaRoutes(candidate ice.Candidate, clientRoutes route.HAMap) bool {
|
||||
addr, err := netip.ParseAddr(candidate.Address())
|
||||
if err != nil {
|
||||
log.Errorf("Failed to parse IP address %s: %v", candidate.Address(), err)
|
||||
return false
|
||||
}
|
||||
|
||||
var routePrefixes []netip.Prefix
|
||||
for _, routes := range clientRoutes {
|
||||
if len(routes) > 0 && routes[0] != nil {
|
||||
routePrefixes = append(routePrefixes, routes[0].Network)
|
||||
}
|
||||
}
|
||||
|
||||
for _, prefix := range routePrefixes {
|
||||
// default route is handled by route exclusion / ip rules
|
||||
if prefix.Bits() == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if prefix.Contains(addr) {
|
||||
log.Debugf("Ignoring candidate [%s], its address is part of routed network %s", candidate.String(), prefix)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func isRelayCandidate(candidate ice.Candidate) bool {
|
||||
return candidate.Type() == ice.CandidateTypeRelay
|
||||
}
|
||||
|
||||
@@ -121,9 +121,12 @@ func (r *SysOps) addRouteToNonVPNIntf(prefix netip.Prefix, vpnIntf wgIface, init
|
||||
return Nexthop{}, vars.ErrRouteNotAllowed
|
||||
}
|
||||
|
||||
// Check if the prefix is part of any local subnets
|
||||
if isLocal, subnet := r.isPrefixInLocalSubnets(prefix); isLocal {
|
||||
return Nexthop{}, fmt.Errorf("prefix %s is part of local subnet %s: %w", prefix, subnet, vars.ErrRouteNotAllowed)
|
||||
// BSDs blackhole a /32 added inside a directly-connected subnet; Linux/Windows need it to beat the wt0 route.
|
||||
switch runtime.GOOS {
|
||||
case "darwin", "freebsd", "netbsd", "openbsd", "dragonfly":
|
||||
if isLocal, subnet := r.isPrefixInLocalSubnets(prefix); isLocal {
|
||||
return Nexthop{}, fmt.Errorf("prefix %s is part of local subnet %s: %w", prefix, subnet, vars.ErrRouteNotAllowed)
|
||||
}
|
||||
}
|
||||
|
||||
// Determine the exit interface and next hop for the prefix, so we can add a specific route
|
||||
|
||||
@@ -136,7 +136,6 @@ func New(ctx context.Context, logFile string, configFile string, profilesDisable
|
||||
networksDisabled: networksDisabled,
|
||||
jwtCache: newJWTCache(),
|
||||
}
|
||||
go s.statusRecorder.StartProfile(ctx)
|
||||
agent := &serverAgent{s}
|
||||
s.sleepHandler = sleephandler.New(agent)
|
||||
s.startSleepDetector()
|
||||
|
||||
91
combined/cmd/admin.go
Normal file
91
combined/cmd/admin.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/netbirdio/netbird/formatter/hook"
|
||||
admincmd "github.com/netbirdio/netbird/management/cmd/admin"
|
||||
tokencmd "github.com/netbirdio/netbird/management/cmd/token"
|
||||
"github.com/netbirdio/netbird/management/server/store"
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
"github.com/netbirdio/netbird/util"
|
||||
)
|
||||
|
||||
// newAdminCommands creates the admin command tree with combined-specific resource openers.
|
||||
func newAdminCommands() *cobra.Command {
|
||||
cmd := admincmd.NewCommands(withAdminResources)
|
||||
cmd.AddCommand(tokencmd.NewCommands(withAdminTokenStore))
|
||||
return cmd
|
||||
}
|
||||
|
||||
// withAdminResources loads the combined YAML config, initializes stores, and calls fn.
|
||||
func withAdminResources(cmd *cobra.Command, fn func(ctx context.Context, resources admincmd.Resources) error) error {
|
||||
return withAdminStore(cmd, func(ctx context.Context, managementStore store.Store, cfg *CombinedConfig) error {
|
||||
mgmtConfig, err := cfg.ToManagementConfig()
|
||||
if err != nil {
|
||||
return fmt.Errorf("create management config: %w", err)
|
||||
}
|
||||
|
||||
idpStorage, err := admincmd.OpenEmbeddedIDPStorage(mgmtConfig.EmbeddedIdP)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err := idpStorage.Close(); err != nil {
|
||||
log.Debugf("close embedded IdP storage: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return fn(ctx, admincmd.Resources{Store: managementStore, IDPStorage: idpStorage})
|
||||
})
|
||||
}
|
||||
|
||||
// withAdminTokenStore opens only the management store for admin token commands.
|
||||
func withAdminTokenStore(cmd *cobra.Command, fn func(ctx context.Context, s store.Store) error) error {
|
||||
return withAdminStore(cmd, func(ctx context.Context, managementStore store.Store, _ *CombinedConfig) error {
|
||||
return fn(ctx, managementStore)
|
||||
})
|
||||
}
|
||||
|
||||
func withAdminStore(cmd *cobra.Command, fn func(ctx context.Context, s store.Store, cfg *CombinedConfig) error) error {
|
||||
if err := util.InitLog("error", "console"); err != nil {
|
||||
return fmt.Errorf("init log: %w", err)
|
||||
}
|
||||
|
||||
ctx := context.WithValue(cmd.Context(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck
|
||||
|
||||
cfg, err := LoadConfig(configPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
if dsn := cfg.Server.Store.DSN; dsn != "" {
|
||||
switch strings.ToLower(cfg.Server.Store.Engine) {
|
||||
case "postgres":
|
||||
os.Setenv("NB_STORE_ENGINE_POSTGRES_DSN", dsn)
|
||||
case "mysql":
|
||||
os.Setenv("NB_STORE_ENGINE_MYSQL_DSN", dsn)
|
||||
}
|
||||
}
|
||||
if file := cfg.Server.Store.File; file != "" {
|
||||
os.Setenv("NB_STORE_ENGINE_SQLITE_FILE", file)
|
||||
}
|
||||
|
||||
managementStore, err := store.NewStore(ctx, types.Engine(cfg.Management.Store.Engine), cfg.Management.DataDir, nil, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create store: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := managementStore.Close(ctx); err != nil {
|
||||
log.Debugf("close store: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return fn(ctx, managementStore, cfg)
|
||||
}
|
||||
@@ -64,7 +64,7 @@ func init() {
|
||||
rootCmd.PersistentFlags().StringVarP(&configPath, "config", "c", "", "path to YAML configuration file (required)")
|
||||
_ = rootCmd.MarkPersistentFlagRequired("config")
|
||||
|
||||
rootCmd.AddCommand(newTokenCommands())
|
||||
rootCmd.AddCommand(newAdminCommands())
|
||||
}
|
||||
|
||||
func RootCmd() *cobra.Command {
|
||||
|
||||
@@ -1,63 +0,0 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/netbirdio/netbird/formatter/hook"
|
||||
tokencmd "github.com/netbirdio/netbird/management/cmd/token"
|
||||
"github.com/netbirdio/netbird/management/server/store"
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
"github.com/netbirdio/netbird/util"
|
||||
)
|
||||
|
||||
// newTokenCommands creates the token command tree with combined-specific store opener.
|
||||
func newTokenCommands() *cobra.Command {
|
||||
return tokencmd.NewCommands(withTokenStore)
|
||||
}
|
||||
|
||||
// withTokenStore loads the combined YAML config, initializes the store, and calls fn.
|
||||
func withTokenStore(cmd *cobra.Command, fn func(ctx context.Context, s store.Store) error) error {
|
||||
if err := util.InitLog("error", "console"); err != nil {
|
||||
return fmt.Errorf("init log: %w", err)
|
||||
}
|
||||
|
||||
ctx := context.WithValue(cmd.Context(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck
|
||||
|
||||
cfg, err := LoadConfig(configPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
if dsn := cfg.Server.Store.DSN; dsn != "" {
|
||||
switch strings.ToLower(cfg.Server.Store.Engine) {
|
||||
case "postgres":
|
||||
os.Setenv("NB_STORE_ENGINE_POSTGRES_DSN", dsn)
|
||||
case "mysql":
|
||||
os.Setenv("NB_STORE_ENGINE_MYSQL_DSN", dsn)
|
||||
}
|
||||
}
|
||||
if file := cfg.Server.Store.File; file != "" {
|
||||
os.Setenv("NB_STORE_ENGINE_SQLITE_FILE", file)
|
||||
}
|
||||
|
||||
datadir := cfg.Management.DataDir
|
||||
engine := types.Engine(cfg.Management.Store.Engine)
|
||||
|
||||
s, err := store.NewStore(ctx, engine, datadir, nil, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create store: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := s.Close(ctx); err != nil {
|
||||
log.Debugf("close store: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return fn(ctx, s)
|
||||
}
|
||||
89
management/cmd/admin.go
Normal file
89
management/cmd/admin.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/netbirdio/netbird/formatter/hook"
|
||||
admincmd "github.com/netbirdio/netbird/management/cmd/admin"
|
||||
tokencmd "github.com/netbirdio/netbird/management/cmd/token"
|
||||
nbconfig "github.com/netbirdio/netbird/management/internals/server/config"
|
||||
"github.com/netbirdio/netbird/management/server/store"
|
||||
"github.com/netbirdio/netbird/util"
|
||||
)
|
||||
|
||||
var adminDatadir string
|
||||
|
||||
// newAdminCommands creates the admin command tree with management-specific resource openers.
|
||||
func newAdminCommands() *cobra.Command {
|
||||
cmd := admincmd.NewCommands(withAdminResources)
|
||||
cmd.PersistentFlags().StringVar(&adminDatadir, "datadir", "", "Override the data directory from config (used for store.db and the default idp.db)")
|
||||
cmd.AddCommand(tokencmd.NewCommands(withAdminTokenStore))
|
||||
return cmd
|
||||
}
|
||||
|
||||
// withAdminResources initializes logging, loads config, opens the management store
|
||||
// and embedded IdP storage, and calls fn.
|
||||
func withAdminResources(cmd *cobra.Command, fn func(ctx context.Context, resources admincmd.Resources) error) error {
|
||||
return withAdminStore(cmd, func(ctx context.Context, managementStore store.Store, config *nbconfig.Config) error {
|
||||
idpStorage, err := admincmd.OpenEmbeddedIDPStorage(config.EmbeddedIdP)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err := idpStorage.Close(); err != nil {
|
||||
log.Debugf("close embedded IdP storage: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return fn(ctx, admincmd.Resources{Store: managementStore, IDPStorage: idpStorage})
|
||||
})
|
||||
}
|
||||
|
||||
// withAdminTokenStore opens only the management store for admin token commands.
|
||||
func withAdminTokenStore(cmd *cobra.Command, fn func(ctx context.Context, s store.Store) error) error {
|
||||
return withAdminStore(cmd, func(ctx context.Context, managementStore store.Store, _ *nbconfig.Config) error {
|
||||
return fn(ctx, managementStore)
|
||||
})
|
||||
}
|
||||
|
||||
func withAdminStore(cmd *cobra.Command, fn func(ctx context.Context, s store.Store, config *nbconfig.Config) error) error {
|
||||
if err := util.InitLog("error", "console"); err != nil {
|
||||
return fmt.Errorf("init log: %w", err)
|
||||
}
|
||||
|
||||
ctx := context.WithValue(cmd.Context(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck
|
||||
|
||||
config, err := LoadMgmtConfig(ctx, nbconfig.MgmtConfigPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
datadir := config.Datadir
|
||||
if adminDatadir != "" {
|
||||
oldDatadir := datadir
|
||||
datadir = adminDatadir
|
||||
if config.EmbeddedIdP != nil && config.EmbeddedIdP.Storage.Type == "sqlite3" {
|
||||
defaultIDPFile := filepath.Join(oldDatadir, "idp.db")
|
||||
if config.EmbeddedIdP.Storage.Config.File == "" || config.EmbeddedIdP.Storage.Config.File == defaultIDPFile {
|
||||
config.EmbeddedIdP.Storage.Config.File = filepath.Join(datadir, "idp.db")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
managementStore, err := store.NewStore(ctx, config.StoreConfig.Engine, datadir, nil, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create store: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := managementStore.Close(ctx); err != nil {
|
||||
log.Debugf("close store: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return fn(ctx, managementStore, config)
|
||||
}
|
||||
441
management/cmd/admin/admin.go
Normal file
441
management/cmd/admin/admin.go
Normal file
@@ -0,0 +1,441 @@
|
||||
// Package admincmd provides reusable cobra commands for self-hosted administrator helpers.
|
||||
// Both the management and combined binaries use these commands, each providing
|
||||
// their own opener to handle config loading and storage initialization.
|
||||
package admincmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/dexidp/dex/storage"
|
||||
"github.com/spf13/cobra"
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
|
||||
nbdex "github.com/netbirdio/netbird/idp/dex"
|
||||
"github.com/netbirdio/netbird/management/server"
|
||||
"github.com/netbirdio/netbird/management/server/idp"
|
||||
"github.com/netbirdio/netbird/management/server/store"
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
)
|
||||
|
||||
const (
|
||||
localConnectorID = "local"
|
||||
dashboardClientID = "netbird-dashboard"
|
||||
cliClientID = "netbird-cli"
|
||||
defaultTOTPAuthenticatorID = "default-totp"
|
||||
)
|
||||
|
||||
// Resources contains the storages required by the admin commands.
|
||||
type Resources struct {
|
||||
Store store.Store
|
||||
IDPStorage storage.Storage
|
||||
}
|
||||
|
||||
// Opener initializes command resources from the command context and calls fn.
|
||||
type Opener func(cmd *cobra.Command, fn func(ctx context.Context, resources Resources) error) error
|
||||
|
||||
type userSelector struct {
|
||||
email string
|
||||
userID string
|
||||
}
|
||||
|
||||
func (s userSelector) normalized() userSelector {
|
||||
return userSelector{
|
||||
email: strings.TrimSpace(s.email),
|
||||
userID: strings.TrimSpace(s.userID),
|
||||
}
|
||||
}
|
||||
|
||||
func (s userSelector) validate() error {
|
||||
s = s.normalized()
|
||||
if (s.email == "") == (s.userID == "") {
|
||||
return fmt.Errorf("provide exactly one of --email or --user-id")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewCommands creates the admin command tree with the given resource opener.
|
||||
func NewCommands(opener Opener) *cobra.Command {
|
||||
adminCmd := &cobra.Command{
|
||||
Use: "admin",
|
||||
Short: "Self-hosted administrator helpers",
|
||||
Long: "Administrative helpers for self-hosted deployments using the embedded identity provider.",
|
||||
}
|
||||
|
||||
userCmd := &cobra.Command{
|
||||
Use: "user",
|
||||
Short: "Manage local embedded IdP users",
|
||||
}
|
||||
|
||||
var passwordSelector userSelector
|
||||
var password string
|
||||
var passwordFile string
|
||||
passwordCmd := &cobra.Command{
|
||||
Use: "change-password (--email email | --user-id id) (--password password | --password-file path)",
|
||||
Aliases: []string{"set-password"},
|
||||
Short: "Change a local user's password",
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||
newPassword, err := resolvePasswordInput(cmd, password, passwordFile)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return opener(cmd, func(ctx context.Context, resources Resources) error {
|
||||
return runChangePassword(ctx, resources.IDPStorage, cmd.OutOrStdout(), passwordSelector, newPassword)
|
||||
})
|
||||
},
|
||||
}
|
||||
addUserSelectorFlags(passwordCmd, &passwordSelector)
|
||||
passwordCmd.Flags().StringVar(&password, "password", "", "New password for the user")
|
||||
passwordCmd.Flags().StringVar(&passwordFile, "password-file", "", "Read new password from file ('-' for stdin)")
|
||||
|
||||
var resetSelector userSelector
|
||||
resetMFACmd := &cobra.Command{
|
||||
Use: "reset-mfa (--email email | --user-id id)",
|
||||
Short: "Reset a local user's MFA enrollment",
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||
return opener(cmd, func(ctx context.Context, resources Resources) error {
|
||||
return runResetMFA(ctx, resources.IDPStorage, cmd.OutOrStdout(), resetSelector)
|
||||
})
|
||||
},
|
||||
}
|
||||
addUserSelectorFlags(resetMFACmd, &resetSelector)
|
||||
|
||||
userCmd.AddCommand(passwordCmd, resetMFACmd)
|
||||
|
||||
mfaCmd := &cobra.Command{
|
||||
Use: "mfa",
|
||||
Short: "Manage local MFA for embedded IdP users",
|
||||
}
|
||||
|
||||
enableCmd := &cobra.Command{
|
||||
Use: "enable",
|
||||
Short: "Enable MFA for local embedded IdP users",
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||
return opener(cmd, func(ctx context.Context, resources Resources) error {
|
||||
return runSetMFAEnabled(ctx, resources, cmd.OutOrStdout(), true)
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
disableCmd := &cobra.Command{
|
||||
Use: "disable",
|
||||
Short: "Disable MFA for local embedded IdP users",
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||
return opener(cmd, func(ctx context.Context, resources Resources) error {
|
||||
return runSetMFAEnabled(ctx, resources, cmd.OutOrStdout(), false)
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
statusCmd := &cobra.Command{
|
||||
Use: "status",
|
||||
Short: "Show local MFA status",
|
||||
Args: cobra.NoArgs,
|
||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||
return opener(cmd, func(ctx context.Context, resources Resources) error {
|
||||
return runMFAStatus(ctx, resources, cmd.OutOrStdout())
|
||||
})
|
||||
},
|
||||
}
|
||||
|
||||
mfaCmd.AddCommand(enableCmd, disableCmd, statusCmd)
|
||||
adminCmd.AddCommand(userCmd, mfaCmd)
|
||||
return adminCmd
|
||||
}
|
||||
|
||||
// OpenEmbeddedIDPStorage opens the Dex storage configured for the embedded IdP.
|
||||
func OpenEmbeddedIDPStorage(cfg *idp.EmbeddedIdPConfig) (storage.Storage, error) {
|
||||
if cfg == nil || !cfg.Enabled {
|
||||
return nil, fmt.Errorf("admin commands require the embedded IdP to be enabled")
|
||||
}
|
||||
|
||||
yamlConfig, err := cfg.ToYAMLConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("build embedded IdP config: %w", err)
|
||||
}
|
||||
|
||||
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
|
||||
st, err := yamlConfig.Storage.OpenStorage(logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("open embedded IdP storage: %w", err)
|
||||
}
|
||||
return st, nil
|
||||
}
|
||||
|
||||
func addUserSelectorFlags(cmd *cobra.Command, selector *userSelector) {
|
||||
cmd.Flags().StringVar(&selector.email, "email", "", "User email")
|
||||
cmd.Flags().StringVar(&selector.userID, "user-id", "", "User ID")
|
||||
}
|
||||
|
||||
func resolvePasswordInput(cmd *cobra.Command, password, passwordFile string) (string, error) {
|
||||
if password != "" && passwordFile != "" {
|
||||
return "", fmt.Errorf("provide only one of --password or --password-file")
|
||||
}
|
||||
if passwordFile == "" {
|
||||
return password, nil
|
||||
}
|
||||
|
||||
var data []byte
|
||||
var err error
|
||||
if passwordFile == "-" {
|
||||
data, err = io.ReadAll(cmd.InOrStdin())
|
||||
} else {
|
||||
data, err = os.ReadFile(passwordFile)
|
||||
}
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("read password: %w", err)
|
||||
}
|
||||
return strings.TrimRight(string(data), "\r\n"), nil
|
||||
}
|
||||
|
||||
func runChangePassword(ctx context.Context, idpStorage storage.Storage, w io.Writer, selector userSelector, password string) error {
|
||||
if idpStorage == nil {
|
||||
return fmt.Errorf("embedded IdP storage is required")
|
||||
}
|
||||
selector = selector.normalized()
|
||||
if err := selector.validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
if password == "" {
|
||||
return fmt.Errorf("password is required")
|
||||
}
|
||||
if err := server.ValidatePassword(password); err != nil {
|
||||
return fmt.Errorf("invalid password: %w", err)
|
||||
}
|
||||
|
||||
user, err := findLocalUser(ctx, idpStorage, selector)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hash, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
|
||||
if err != nil {
|
||||
return fmt.Errorf("hash password: %w", err)
|
||||
}
|
||||
|
||||
if err := idpStorage.UpdatePassword(ctx, user.Email, func(old storage.Password) (storage.Password, error) {
|
||||
old.Hash = hash
|
||||
return old, nil
|
||||
}); err != nil {
|
||||
return fmt.Errorf("update password for %s: %w", user.Email, err)
|
||||
}
|
||||
|
||||
if err := deleteLocalAuthSession(ctx, idpStorage, user.UserID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, _ = fmt.Fprintf(w, "Password updated for %s.\n", user.Email)
|
||||
return nil
|
||||
}
|
||||
|
||||
func runResetMFA(ctx context.Context, idpStorage storage.Storage, w io.Writer, selector userSelector) error {
|
||||
if idpStorage == nil {
|
||||
return fmt.Errorf("embedded IdP storage is required")
|
||||
}
|
||||
selector = selector.normalized()
|
||||
if err := selector.validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
user, err := findLocalUser(ctx, idpStorage, selector)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reset := false
|
||||
err = idpStorage.UpdateUserIdentity(ctx, user.UserID, localConnectorID, func(old storage.UserIdentity) (storage.UserIdentity, error) {
|
||||
reset = reset || len(old.MFASecrets) > 0 || len(old.WebAuthnCredentials) > 0
|
||||
old.MFASecrets = map[string]*storage.MFASecret{}
|
||||
old.WebAuthnCredentials = map[string][]storage.WebAuthnCredential{}
|
||||
return old, nil
|
||||
})
|
||||
if errors.Is(err, storage.ErrNotFound) {
|
||||
if err := deleteLocalAuthSession(ctx, idpStorage, user.UserID); err != nil {
|
||||
return err
|
||||
}
|
||||
_, _ = fmt.Fprintf(w, "No MFA enrollment found for %s.\n", user.Email)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("reset MFA for %s: %w", user.Email, err)
|
||||
}
|
||||
|
||||
if err := deleteLocalAuthSession(ctx, idpStorage, user.UserID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if reset {
|
||||
_, _ = fmt.Fprintf(w, "MFA reset for %s. The user will re-enroll at next login.\n", user.Email)
|
||||
} else {
|
||||
_, _ = fmt.Fprintf(w, "No MFA enrollment found for %s.\n", user.Email)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func runSetMFAEnabled(ctx context.Context, resources Resources, w io.Writer, enabled bool) error {
|
||||
if resources.Store == nil {
|
||||
return fmt.Errorf("management store is required")
|
||||
}
|
||||
if resources.IDPStorage == nil {
|
||||
return fmt.Errorf("embedded IdP storage is required")
|
||||
}
|
||||
|
||||
accounts := resources.Store.GetAllAccounts(ctx)
|
||||
if len(accounts) != 1 {
|
||||
return fmt.Errorf("expected exactly one account, got %d; local MFA is supported only in single-account embedded IdP deployments", len(accounts))
|
||||
}
|
||||
|
||||
settings := &types.Settings{}
|
||||
if accounts[0].Settings != nil {
|
||||
settings = accounts[0].Settings.Copy()
|
||||
}
|
||||
settings.LocalMfaEnabled = enabled
|
||||
if err := resources.Store.SaveAccountSettings(ctx, accounts[0].Id, settings); err != nil {
|
||||
return fmt.Errorf("save local MFA account setting: %w", err)
|
||||
}
|
||||
|
||||
if err := setIDPClientsMFA(ctx, resources.IDPStorage, enabled); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
state := "disabled"
|
||||
if enabled {
|
||||
state = "enabled"
|
||||
}
|
||||
_, _ = fmt.Fprintf(w, "Local MFA %s.\n", state)
|
||||
return nil
|
||||
}
|
||||
|
||||
func runMFAStatus(ctx context.Context, resources Resources, w io.Writer) error {
|
||||
if resources.Store == nil {
|
||||
return fmt.Errorf("management store is required")
|
||||
}
|
||||
if resources.IDPStorage == nil {
|
||||
return fmt.Errorf("embedded IdP storage is required")
|
||||
}
|
||||
|
||||
accounts := resources.Store.GetAllAccounts(ctx)
|
||||
accountStatus := "unknown"
|
||||
if len(accounts) == 1 && accounts[0].Settings != nil {
|
||||
accountStatus = "disabled"
|
||||
if accounts[0].Settings.LocalMfaEnabled {
|
||||
accountStatus = "enabled"
|
||||
}
|
||||
}
|
||||
|
||||
clientStatus, err := idpClientsMFAStatus(ctx, resources.IDPStorage)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, _ = fmt.Fprintf(w, "Account setting: %s\n", accountStatus)
|
||||
_, _ = fmt.Fprintf(w, "Embedded IdP clients: %s\n", clientStatus)
|
||||
return nil
|
||||
}
|
||||
|
||||
func findLocalUser(ctx context.Context, idpStorage storage.Storage, selector userSelector) (storage.Password, error) {
|
||||
selector = selector.normalized()
|
||||
if err := selector.validate(); err != nil {
|
||||
return storage.Password{}, err
|
||||
}
|
||||
|
||||
if selector.email != "" {
|
||||
user, err := idpStorage.GetPassword(ctx, selector.email)
|
||||
if errors.Is(err, storage.ErrNotFound) {
|
||||
return storage.Password{}, fmt.Errorf("local user with email %q not found", selector.email)
|
||||
}
|
||||
if err != nil {
|
||||
return storage.Password{}, fmt.Errorf("get local user by email %q: %w", selector.email, err)
|
||||
}
|
||||
return user, nil
|
||||
}
|
||||
|
||||
rawUserID := selector.userID
|
||||
if decodedUserID, _, err := nbdex.DecodeDexUserID(selector.userID); err == nil && decodedUserID != "" {
|
||||
rawUserID = decodedUserID
|
||||
}
|
||||
|
||||
users, err := idpStorage.ListPasswords(ctx)
|
||||
if err != nil {
|
||||
return storage.Password{}, fmt.Errorf("list local users: %w", err)
|
||||
}
|
||||
for _, user := range users {
|
||||
if user.UserID == rawUserID || user.UserID == selector.userID {
|
||||
return user, nil
|
||||
}
|
||||
}
|
||||
|
||||
return storage.Password{}, fmt.Errorf("local user with ID %q not found", selector.userID)
|
||||
}
|
||||
|
||||
func deleteLocalAuthSession(ctx context.Context, idpStorage storage.Storage, userID string) error {
|
||||
err := idpStorage.DeleteAuthSession(ctx, userID, localConnectorID)
|
||||
if err == nil || errors.Is(err, storage.ErrNotFound) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("delete local auth session for user %s: %w", userID, err)
|
||||
}
|
||||
|
||||
func setIDPClientsMFA(ctx context.Context, idpStorage storage.Storage, enabled bool) error {
|
||||
var mfaChain []string
|
||||
if enabled {
|
||||
mfaChain = []string{defaultTOTPAuthenticatorID}
|
||||
}
|
||||
|
||||
for _, clientID := range []string{cliClientID, dashboardClientID} {
|
||||
if err := idpStorage.UpdateClient(ctx, clientID, func(old storage.Client) (storage.Client, error) {
|
||||
old.MFAChain = mfaChain
|
||||
return old, nil
|
||||
}); err != nil {
|
||||
if errors.Is(err, storage.ErrNotFound) {
|
||||
return fmt.Errorf("embedded IdP client %q not found; start the management server once before toggling MFA", clientID)
|
||||
}
|
||||
return fmt.Errorf("update MFA chain on embedded IdP client %q: %w", clientID, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func idpClientsMFAStatus(ctx context.Context, idpStorage storage.Storage) (string, error) {
|
||||
clientIDs := []string{cliClientID, dashboardClientID}
|
||||
enabledCount := 0
|
||||
for _, clientID := range clientIDs {
|
||||
client, err := idpStorage.GetClient(ctx, clientID)
|
||||
if errors.Is(err, storage.ErrNotFound) {
|
||||
return "unknown", fmt.Errorf("embedded IdP client %q not found", clientID)
|
||||
}
|
||||
if err != nil {
|
||||
return "unknown", fmt.Errorf("get embedded IdP client %q: %w", clientID, err)
|
||||
}
|
||||
if hasAuthenticator(client.MFAChain, defaultTOTPAuthenticatorID) {
|
||||
enabledCount++
|
||||
}
|
||||
}
|
||||
|
||||
switch enabledCount {
|
||||
case 0:
|
||||
return "disabled", nil
|
||||
case len(clientIDs):
|
||||
return "enabled", nil
|
||||
default:
|
||||
return "partially enabled", nil
|
||||
}
|
||||
}
|
||||
|
||||
func hasAuthenticator(chain []string, authenticatorID string) bool {
|
||||
for _, id := range chain {
|
||||
if id == authenticatorID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
160
management/cmd/admin/admin_test.go
Normal file
160
management/cmd/admin/admin_test.go
Normal file
@@ -0,0 +1,160 @@
|
||||
package admincmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dexidp/dex/storage"
|
||||
"github.com/dexidp/dex/storage/memory"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/crypto/bcrypt"
|
||||
|
||||
nbdex "github.com/netbirdio/netbird/idp/dex"
|
||||
)
|
||||
|
||||
func newTestIDPStorage(t *testing.T) storage.Storage {
|
||||
t.Helper()
|
||||
|
||||
st := memory.New(slog.New(slog.NewTextHandler(io.Discard, nil)))
|
||||
hash, err := bcrypt.GenerateFromPassword([]byte("OldPass1!"), bcrypt.DefaultCost)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, st.CreatePassword(context.Background(), storage.Password{
|
||||
Email: "user@example.com",
|
||||
Username: "User",
|
||||
UserID: "user-1",
|
||||
Hash: hash,
|
||||
}))
|
||||
require.NoError(t, st.CreateUserIdentity(context.Background(), storage.UserIdentity{
|
||||
UserID: "user-1",
|
||||
ConnectorID: localConnectorID,
|
||||
MFASecrets: map[string]*storage.MFASecret{
|
||||
defaultTOTPAuthenticatorID: {
|
||||
AuthenticatorID: defaultTOTPAuthenticatorID,
|
||||
Type: "TOTP",
|
||||
Secret: "otpauth://totp/NetBird:user@example.com?secret=ABC",
|
||||
Confirmed: true,
|
||||
CreatedAt: time.Now(),
|
||||
},
|
||||
},
|
||||
WebAuthnCredentials: map[string][]storage.WebAuthnCredential{
|
||||
"webauthn": {{CredentialID: []byte("credential")}},
|
||||
},
|
||||
}))
|
||||
require.NoError(t, st.CreateAuthSession(context.Background(), storage.AuthSession{
|
||||
UserID: "user-1",
|
||||
ConnectorID: localConnectorID,
|
||||
Nonce: "nonce",
|
||||
}))
|
||||
require.NoError(t, st.CreateClient(context.Background(), storage.Client{ID: cliClientID, Name: "CLI"}))
|
||||
require.NoError(t, st.CreateClient(context.Background(), storage.Client{ID: dashboardClientID, Name: "Dashboard"}))
|
||||
|
||||
return st
|
||||
}
|
||||
|
||||
func TestRunChangePassword(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
st := newTestIDPStorage(t)
|
||||
var out bytes.Buffer
|
||||
|
||||
err := runChangePassword(ctx, st, &out, userSelector{email: "user@example.com"}, "NewPass1!")
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, out.String(), "Password updated")
|
||||
|
||||
user, err := st.GetPassword(ctx, "user@example.com")
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, bcrypt.CompareHashAndPassword(user.Hash, []byte("NewPass1!")))
|
||||
|
||||
_, err = st.GetAuthSession(ctx, "user-1", localConnectorID)
|
||||
require.ErrorIs(t, err, storage.ErrNotFound)
|
||||
}
|
||||
|
||||
func TestRunChangePasswordValidatesPassword(t *testing.T) {
|
||||
st := newTestIDPStorage(t)
|
||||
err := runChangePassword(context.Background(), st, io.Discard, userSelector{email: "user@example.com"}, "short")
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "invalid password")
|
||||
}
|
||||
|
||||
func TestRunResetMFA(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
st := newTestIDPStorage(t)
|
||||
var out bytes.Buffer
|
||||
|
||||
encodedUserID := nbdex.EncodeDexUserID("user-1", localConnectorID)
|
||||
err := runResetMFA(ctx, st, &out, userSelector{userID: encodedUserID})
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, out.String(), "MFA reset")
|
||||
|
||||
identity, err := st.GetUserIdentity(ctx, "user-1", localConnectorID)
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, identity.MFASecrets)
|
||||
require.Empty(t, identity.WebAuthnCredentials)
|
||||
|
||||
_, err = st.GetAuthSession(ctx, "user-1", localConnectorID)
|
||||
require.ErrorIs(t, err, storage.ErrNotFound)
|
||||
}
|
||||
|
||||
func TestRunResetMFAWithoutEnrollment(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
st := newTestIDPStorage(t)
|
||||
require.NoError(t, st.UpdateUserIdentity(ctx, "user-1", localConnectorID, func(old storage.UserIdentity) (storage.UserIdentity, error) {
|
||||
old.MFASecrets = nil
|
||||
old.WebAuthnCredentials = nil
|
||||
return old, nil
|
||||
}))
|
||||
|
||||
var out bytes.Buffer
|
||||
err := runResetMFA(ctx, st, &out, userSelector{email: "user@example.com"})
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, out.String(), "No MFA enrollment found")
|
||||
}
|
||||
|
||||
func TestSetIDPClientsMFA(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
st := newTestIDPStorage(t)
|
||||
|
||||
require.NoError(t, setIDPClientsMFA(ctx, st, true))
|
||||
status, err := idpClientsMFAStatus(ctx, st)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "enabled", status)
|
||||
|
||||
require.NoError(t, setIDPClientsMFA(ctx, st, false))
|
||||
status, err = idpClientsMFAStatus(ctx, st)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "disabled", status)
|
||||
}
|
||||
|
||||
func TestUserSelectorValidate(t *testing.T) {
|
||||
require.NoError(t, userSelector{email: " user@example.com "}.validate())
|
||||
require.NoError(t, userSelector{userID: "user-1"}.validate())
|
||||
require.Error(t, userSelector{}.validate())
|
||||
require.Error(t, userSelector{email: "user@example.com", userID: "user-1"}.validate())
|
||||
}
|
||||
|
||||
func TestFindLocalUserNotFound(t *testing.T) {
|
||||
st := newTestIDPStorage(t)
|
||||
_, err := findLocalUser(context.Background(), st, userSelector{email: "missing@example.com"})
|
||||
require.Error(t, err)
|
||||
require.True(t, strings.Contains(err.Error(), "not found"))
|
||||
}
|
||||
|
||||
func TestResolvePasswordInputFromStdin(t *testing.T) {
|
||||
cmd := &cobra.Command{}
|
||||
cmd.SetIn(strings.NewReader("NewPass1!\n"))
|
||||
|
||||
password, err := resolvePasswordInput(cmd, "", "-")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "NewPass1!", password)
|
||||
}
|
||||
|
||||
func TestResolvePasswordInputRejectsMultipleSources(t *testing.T) {
|
||||
_, err := resolvePasswordInput(&cobra.Command{}, "NewPass1!", "-")
|
||||
require.Error(t, err)
|
||||
}
|
||||
@@ -83,7 +83,7 @@ func init() {
|
||||
|
||||
rootCmd.AddCommand(migrationCmd)
|
||||
|
||||
tc := newTokenCommands()
|
||||
tc.PersistentFlags().StringVar(&nbconfig.MgmtConfigPath, "config", defaultMgmtConfig, "Netbird config file location")
|
||||
rootCmd.AddCommand(tc)
|
||||
ac := newAdminCommands()
|
||||
ac.PersistentFlags().StringVar(&nbconfig.MgmtConfigPath, "config", defaultMgmtConfig, "Netbird config file location")
|
||||
rootCmd.AddCommand(ac)
|
||||
}
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/netbirdio/netbird/formatter/hook"
|
||||
tokencmd "github.com/netbirdio/netbird/management/cmd/token"
|
||||
nbconfig "github.com/netbirdio/netbird/management/internals/server/config"
|
||||
"github.com/netbirdio/netbird/management/server/store"
|
||||
"github.com/netbirdio/netbird/util"
|
||||
)
|
||||
|
||||
var tokenDatadir string
|
||||
|
||||
// newTokenCommands creates the token command tree with management-specific store opener.
|
||||
func newTokenCommands() *cobra.Command {
|
||||
cmd := tokencmd.NewCommands(withTokenStore)
|
||||
cmd.PersistentFlags().StringVar(&tokenDatadir, "datadir", "", "Override the data directory from config (where store.db is located)")
|
||||
return cmd
|
||||
}
|
||||
|
||||
// withTokenStore initializes logging, loads config, opens the store, and calls fn.
|
||||
func withTokenStore(cmd *cobra.Command, fn func(ctx context.Context, s store.Store) error) error {
|
||||
if err := util.InitLog("error", "console"); err != nil {
|
||||
return fmt.Errorf("init log: %w", err)
|
||||
}
|
||||
|
||||
ctx := context.WithValue(cmd.Context(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck
|
||||
|
||||
config, err := LoadMgmtConfig(ctx, nbconfig.MgmtConfigPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("load config: %w", err)
|
||||
}
|
||||
|
||||
datadir := config.Datadir
|
||||
if tokenDatadir != "" {
|
||||
datadir = tokenDatadir
|
||||
}
|
||||
|
||||
s, err := store.NewStore(ctx, config.StoreConfig.Engine, datadir, nil, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create store: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := s.Close(ctx); err != nil {
|
||||
log.Debugf("close store: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return fn(ctx, s)
|
||||
}
|
||||
@@ -1205,7 +1205,7 @@ func (s *Server) SyncMeta(ctx context.Context, req *proto.EncryptedMessage) (*pr
|
||||
return nil, msg
|
||||
}
|
||||
|
||||
err = s.accountManager.SyncPeerMeta(ctx, peerKey.String(), extractPeerMeta(ctx, syncMetaReq.GetMeta()), realIP)
|
||||
err = s.accountManager.SyncPeerMeta(ctx, peerKey.String(), extractPeerMeta(ctx, syncMetaReq.GetMeta()))
|
||||
if err != nil {
|
||||
return nil, mapError(ctx, err)
|
||||
}
|
||||
@@ -1254,10 +1254,7 @@ func (s *Server) Logout(ctx context.Context, req *proto.EncryptedMessage) (*prot
|
||||
func toProtocolChecks(ctx context.Context, postureChecks []*posture.Checks) []*proto.Checks {
|
||||
protoChecks := make([]*proto.Checks, 0, len(postureChecks))
|
||||
for _, postureCheck := range postureChecks {
|
||||
check := toProtocolCheck(postureCheck)
|
||||
if check != nil {
|
||||
protoChecks = append(protoChecks, check)
|
||||
}
|
||||
protoChecks = append(protoChecks, toProtocolCheck(postureCheck))
|
||||
}
|
||||
|
||||
return protoChecks
|
||||
@@ -1281,9 +1278,5 @@ func toProtocolCheck(postureCheck *posture.Checks) *proto.Checks {
|
||||
}
|
||||
}
|
||||
|
||||
if len(protoCheck.Files) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return protoCheck
|
||||
}
|
||||
|
||||
@@ -1889,12 +1889,12 @@ func domainIsUpToDate(domain string, domainCategory string, userAuth auth.UserAu
|
||||
// concurrent stream that started earlier loses the optimistic-lock race
|
||||
// in MarkPeerConnected and bails without writing.
|
||||
func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, syncTime time.Time) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) {
|
||||
peer, netMap, postureChecks, dnsfwdPort, err := am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta, RealIP: realIP}, accountID)
|
||||
peer, netMap, postureChecks, dnsfwdPort, err := am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, accountID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, 0, fmt.Errorf("error syncing peer: %w", err)
|
||||
}
|
||||
|
||||
if err := am.MarkPeerConnected(ctx, peerPubKey, accountID, syncTime.UnixNano(), netMap); err != nil {
|
||||
if err := am.MarkPeerConnected(ctx, peerPubKey, realIP, accountID, syncTime.UnixNano(), netMap); err != nil {
|
||||
log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err)
|
||||
}
|
||||
|
||||
@@ -1914,13 +1914,13 @@ func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, account
|
||||
return nil
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) error {
|
||||
func (am *DefaultAccountManager) SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta) error {
|
||||
accountID, err := am.Store.GetAccountIDByPeerPubKey(ctx, peerPubKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, _, _, _, err = am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta, RealIP: realIP, UpdateAccountPeers: true}, accountID)
|
||||
_, _, _, _, err = am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta, UpdateAccountPeers: true}, accountID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ type Manager interface {
|
||||
GetUserFromUserAuth(ctx context.Context, userAuth auth.UserAuth) (*types.User, error)
|
||||
ListUsers(ctx context.Context, accountID string) ([]*types.User, error)
|
||||
GetPeers(ctx context.Context, accountID, userID, nameFilter, ipFilter string) ([]*nbpeer.Peer, error)
|
||||
MarkPeerConnected(ctx context.Context, peerKey string, accountID string, sessionStartedAt int64, nmap *types.NetworkMap) error
|
||||
MarkPeerConnected(ctx context.Context, peerKey string, realIP net.IP, accountID string, sessionStartedAt int64, nmap *types.NetworkMap) error
|
||||
MarkPeerDisconnected(ctx context.Context, peerKey string, accountID string, sessionStartedAt int64) error
|
||||
DeletePeer(ctx context.Context, accountID, peerID, userID string) error
|
||||
UpdatePeer(ctx context.Context, accountID, userID string, p *nbpeer.Peer) (*nbpeer.Peer, error)
|
||||
@@ -123,7 +123,7 @@ type Manager interface {
|
||||
GetValidatedPeers(ctx context.Context, accountID string) (map[string]struct{}, map[string]string, error)
|
||||
SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, syncTime time.Time) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error)
|
||||
OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string, streamStartTime time.Time) error
|
||||
SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) error
|
||||
SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta) error
|
||||
FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error)
|
||||
GetAccountIDForPeerKey(ctx context.Context, peerKey string) (string, error)
|
||||
GetAccountSettings(ctx context.Context, accountID string, userID string) (*types.Settings, error)
|
||||
|
||||
@@ -1323,17 +1323,17 @@ func (mr *MockManagerMockRecorder) ExtendPeerSession(ctx, peerPubKey, userID int
|
||||
}
|
||||
|
||||
// MarkPeerConnected mocks base method.
|
||||
func (m *MockManager) MarkPeerConnected(ctx context.Context, peerKey string, accountID string, sessionStartedAt int64, nmap *types.NetworkMap) error {
|
||||
func (m *MockManager) MarkPeerConnected(ctx context.Context, peerKey string, realIP net.IP, accountID string, sessionStartedAt int64, nmap *types.NetworkMap) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "MarkPeerConnected", ctx, peerKey, accountID, sessionStartedAt, nmap)
|
||||
ret := m.ctrl.Call(m, "MarkPeerConnected", ctx, peerKey, realIP, accountID, sessionStartedAt, nmap)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// MarkPeerConnected indicates an expected call of MarkPeerConnected.
|
||||
func (mr *MockManagerMockRecorder) MarkPeerConnected(ctx, peerKey, accountID, sessionStartedAt, nmap interface{}) *gomock.Call {
|
||||
func (mr *MockManagerMockRecorder) MarkPeerConnected(ctx, peerKey, realIP, accountID, sessionStartedAt, nmap interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkPeerConnected", reflect.TypeOf((*MockManager)(nil).MarkPeerConnected), ctx, peerKey, accountID, sessionStartedAt, nmap)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MarkPeerConnected", reflect.TypeOf((*MockManager)(nil).MarkPeerConnected), ctx, peerKey, realIP, accountID, sessionStartedAt, nmap)
|
||||
}
|
||||
|
||||
// MarkPeerDisconnected mocks base method.
|
||||
@@ -1586,17 +1586,17 @@ func (mr *MockManagerMockRecorder) SyncPeer(ctx, sync, accountID interface{}) *g
|
||||
}
|
||||
|
||||
// SyncPeerMeta mocks base method.
|
||||
func (m *MockManager) SyncPeerMeta(ctx context.Context, peerPubKey string, meta peer.PeerSystemMeta, realIP net.IP) error {
|
||||
func (m *MockManager) SyncPeerMeta(ctx context.Context, peerPubKey string, meta peer.PeerSystemMeta) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SyncPeerMeta", ctx, peerPubKey, meta, realIP)
|
||||
ret := m.ctrl.Call(m, "SyncPeerMeta", ctx, peerPubKey, meta)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SyncPeerMeta indicates an expected call of SyncPeerMeta.
|
||||
func (mr *MockManagerMockRecorder) SyncPeerMeta(ctx, peerPubKey, meta, realIP interface{}) *gomock.Call {
|
||||
func (mr *MockManagerMockRecorder) SyncPeerMeta(ctx, peerPubKey, meta interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPeerMeta", reflect.TypeOf((*MockManager)(nil).SyncPeerMeta), ctx, peerPubKey, meta, realIP)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPeerMeta", reflect.TypeOf((*MockManager)(nil).SyncPeerMeta), ctx, peerPubKey, meta)
|
||||
}
|
||||
|
||||
// SyncUserJWTGroups mocks base method.
|
||||
|
||||
@@ -1836,7 +1836,7 @@ func TestDefaultAccountManager_UpdatePeer_PeerLoginExpiration(t *testing.T) {
|
||||
accountID, err := manager.GetAccountIDByUserID(context.Background(), auth.UserAuth{UserId: userID})
|
||||
require.NoError(t, err, "unable to get the account")
|
||||
|
||||
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), accountID, time.Now().UTC().UnixNano(), nil)
|
||||
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), nil, accountID, time.Now().UTC().UnixNano(), nil)
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
_, err = manager.UpdateAccountSettings(context.Background(), accountID, userID, &types.Settings{
|
||||
@@ -1907,7 +1907,7 @@ func TestDefaultAccountManager_MarkPeerConnected_PeerLoginExpiration(t *testing.
|
||||
require.NoError(t, err, "unable to get the account")
|
||||
|
||||
// when we mark peer as connected, the peer login expiration routine should trigger
|
||||
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), accountID, time.Now().UTC().UnixNano(), nil)
|
||||
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), nil, accountID, time.Now().UTC().UnixNano(), nil)
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
failed := waitTimeout(wg, time.Second)
|
||||
@@ -1916,117 +1916,6 @@ func TestDefaultAccountManager_MarkPeerConnected_PeerLoginExpiration(t *testing.
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultAccountManager_MarkPeerDisconnected_SchedulesInactivityExpiration(t *testing.T) {
|
||||
manager, _, err := createManager(t)
|
||||
require.NoError(t, err, "unable to create account manager")
|
||||
|
||||
accountID, err := manager.GetAccountIDByUserID(context.Background(), auth.UserAuth{UserId: userID})
|
||||
require.NoError(t, err, "unable to create an account")
|
||||
|
||||
key, err := wgtypes.GenerateKey()
|
||||
require.NoError(t, err, "unable to generate WireGuard key")
|
||||
peerPubKey := key.PublicKey().String()
|
||||
|
||||
_, _, _, _, err = manager.AddPeer(context.Background(), "", "", userID, &nbpeer.Peer{
|
||||
Key: peerPubKey,
|
||||
Meta: nbpeer.PeerSystemMeta{Hostname: "test-peer"},
|
||||
InactivityExpirationEnabled: true,
|
||||
}, false)
|
||||
require.NoError(t, err, "unable to add peer")
|
||||
|
||||
_, err = manager.UpdateAccountSettings(context.Background(), accountID, userID, &types.Settings{
|
||||
PeerLoginExpiration: time.Hour,
|
||||
PeerLoginExpirationEnabled: true,
|
||||
PeerInactivityExpiration: time.Hour,
|
||||
PeerInactivityExpirationEnabled: true,
|
||||
Extra: &types.ExtraSettings{},
|
||||
})
|
||||
require.NoError(t, err, "expecting to update account settings successfully but got error")
|
||||
|
||||
// Establish a session so the matching-token disconnect is actually applied.
|
||||
streamStartTime := time.Now().UTC()
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, accountID, streamStartTime.UnixNano(), nil)
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
// Install the mock only now, so the assertion observes the disconnect, not
|
||||
// the earlier connect.
|
||||
scheduled := make(chan struct{}, 1)
|
||||
manager.peerInactivityExpiry = &MockScheduler{
|
||||
CancelFunc: func(ctx context.Context, IDs []string) {},
|
||||
ScheduleFunc: func(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) {
|
||||
select {
|
||||
case scheduled <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
err = manager.MarkPeerDisconnected(context.Background(), peerPubKey, accountID, streamStartTime.UnixNano())
|
||||
require.NoError(t, err, "unable to mark peer disconnected")
|
||||
|
||||
select {
|
||||
case <-scheduled:
|
||||
// expected: disconnect re-armed the inactivity expiry timer
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("expected inactivity expiration to be rescheduled when an eligible peer disconnects")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultAccountManager_MarkPeerDisconnected_SkipsInactivityExpirationWhenDisabled(t *testing.T) {
|
||||
manager, _, err := createManager(t)
|
||||
require.NoError(t, err, "unable to create account manager")
|
||||
|
||||
accountID, err := manager.GetAccountIDByUserID(context.Background(), auth.UserAuth{UserId: userID})
|
||||
require.NoError(t, err, "unable to create an account")
|
||||
|
||||
key, err := wgtypes.GenerateKey()
|
||||
require.NoError(t, err, "unable to generate WireGuard key")
|
||||
peerPubKey := key.PublicKey().String()
|
||||
|
||||
_, _, _, _, err = manager.AddPeer(context.Background(), "", "", userID, &nbpeer.Peer{
|
||||
Key: peerPubKey,
|
||||
Meta: nbpeer.PeerSystemMeta{Hostname: "test-peer"},
|
||||
InactivityExpirationEnabled: true,
|
||||
}, false)
|
||||
require.NoError(t, err, "unable to add peer")
|
||||
|
||||
// Peer is eligible (SSO + inactivity enabled) but the account-level setting
|
||||
// stays disabled, so disconnect must not schedule anything.
|
||||
_, err = manager.UpdateAccountSettings(context.Background(), accountID, userID, &types.Settings{
|
||||
PeerLoginExpiration: time.Hour,
|
||||
PeerLoginExpirationEnabled: true,
|
||||
PeerInactivityExpiration: time.Hour,
|
||||
PeerInactivityExpirationEnabled: false,
|
||||
Extra: &types.ExtraSettings{},
|
||||
})
|
||||
require.NoError(t, err, "expecting to update account settings successfully but got error")
|
||||
|
||||
streamStartTime := time.Now().UTC()
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, accountID, streamStartTime.UnixNano(), nil)
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
scheduled := make(chan struct{}, 1)
|
||||
manager.peerInactivityExpiry = &MockScheduler{
|
||||
CancelFunc: func(ctx context.Context, IDs []string) {},
|
||||
ScheduleFunc: func(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) {
|
||||
select {
|
||||
case scheduled <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
err = manager.MarkPeerDisconnected(context.Background(), peerPubKey, accountID, streamStartTime.UnixNano())
|
||||
require.NoError(t, err, "unable to mark peer disconnected")
|
||||
|
||||
select {
|
||||
case <-scheduled:
|
||||
t.Fatal("inactivity expiration must not be scheduled while the account-level setting is disabled")
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
// expected: nothing scheduled
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultAccountManager_OnPeerDisconnected_LastSeenCheck(t *testing.T) {
|
||||
manager, _, err := createManager(t)
|
||||
require.NoError(t, err, "unable to create account manager")
|
||||
@@ -2046,7 +1935,7 @@ func TestDefaultAccountManager_OnPeerDisconnected_LastSeenCheck(t *testing.T) {
|
||||
|
||||
t.Run("disconnect peer when session token matches", func(t *testing.T) {
|
||||
streamStartTime := time.Now().UTC()
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, accountID, streamStartTime.UnixNano(), nil)
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, streamStartTime.UnixNano(), nil)
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
peer, err := manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
|
||||
@@ -2067,7 +1956,7 @@ func TestDefaultAccountManager_OnPeerDisconnected_LastSeenCheck(t *testing.T) {
|
||||
t.Run("skip disconnect when stored session is newer (zombie stream protection)", func(t *testing.T) {
|
||||
// Newer stream wins on connect (sets SessionStartedAt = now ns).
|
||||
streamStartTime := time.Now().UTC()
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, accountID, streamStartTime.UnixNano(), nil)
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, streamStartTime.UnixNano(), nil)
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
peer, err := manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
|
||||
@@ -2091,7 +1980,7 @@ func TestDefaultAccountManager_OnPeerDisconnected_LastSeenCheck(t *testing.T) {
|
||||
|
||||
t.Run("skip stale connect when stored session is newer (blocked goroutine protection)", func(t *testing.T) {
|
||||
node2SyncTime := time.Now().UTC()
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, accountID, node2SyncTime.UnixNano(), nil)
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, node2SyncTime.UnixNano(), nil)
|
||||
require.NoError(t, err, "node 2 should connect peer")
|
||||
|
||||
peer, err := manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
|
||||
@@ -2101,7 +1990,7 @@ func TestDefaultAccountManager_OnPeerDisconnected_LastSeenCheck(t *testing.T) {
|
||||
"SessionStartedAt should equal node2SyncTime token")
|
||||
|
||||
node1StaleSyncTime := node2SyncTime.Add(-1 * time.Minute)
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, accountID, node1StaleSyncTime.UnixNano(), nil)
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, node1StaleSyncTime.UnixNano(), nil)
|
||||
require.NoError(t, err, "stale connect should not return error")
|
||||
|
||||
peer, err = manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
|
||||
@@ -2163,7 +2052,7 @@ func TestDefaultAccountManager_MarkPeerConnected_ConcurrentRace(t *testing.T) {
|
||||
defer done.Done()
|
||||
ready.Done()
|
||||
start.Wait()
|
||||
errs <- manager.MarkPeerConnected(context.Background(), peerPubKey, accountID, token, nil)
|
||||
errs <- manager.MarkPeerConnected(context.Background(), peerPubKey, nil, accountID, token, nil)
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -2204,7 +2093,7 @@ func TestDefaultAccountManager_UpdateAccountSettings_PeerLoginExpiration(t *test
|
||||
account, err := manager.Store.GetAccount(context.Background(), accountID)
|
||||
require.NoError(t, err, "unable to get the account")
|
||||
|
||||
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), accountID, time.Now().UTC().UnixNano(), nil)
|
||||
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), nil, accountID, time.Now().UTC().UnixNano(), nil)
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
@@ -39,7 +39,7 @@ type MockAccountManager struct {
|
||||
GetUserFromUserAuthFunc func(ctx context.Context, userAuth auth.UserAuth) (*types.User, error)
|
||||
ListUsersFunc func(ctx context.Context, accountID string) ([]*types.User, error)
|
||||
GetPeersFunc func(ctx context.Context, accountID, userID, nameFilter, ipFilter string) ([]*nbpeer.Peer, error)
|
||||
MarkPeerConnectedFunc func(ctx context.Context, peerKey string, accountID string, sessionStartedAt int64, nmap *types.NetworkMap) error
|
||||
MarkPeerConnectedFunc func(ctx context.Context, peerKey string, realIP net.IP, accountID string, sessionStartedAt int64, nmap *types.NetworkMap) error
|
||||
MarkPeerDisconnectedFunc func(ctx context.Context, peerKey string, accountID string, sessionStartedAt int64) error
|
||||
SyncAndMarkPeerFunc func(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, syncTime time.Time) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error)
|
||||
DeletePeerFunc func(ctx context.Context, accountID, peerKey, userID string) error
|
||||
@@ -114,7 +114,7 @@ type MockAccountManager struct {
|
||||
GetIdpManagerFunc func() idp.Manager
|
||||
UpdateIntegratedValidatorFunc func(ctx context.Context, accountID, userID, validator string, groups []string) error
|
||||
GroupValidationFunc func(ctx context.Context, accountId string, groups []string) (bool, error)
|
||||
SyncPeerMetaFunc func(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) error
|
||||
SyncPeerMetaFunc func(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta) error
|
||||
FindExistingPostureCheckFunc func(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error)
|
||||
GetAccountIDForPeerKeyFunc func(ctx context.Context, peerKey string) (string, error)
|
||||
GetAccountByIDFunc func(ctx context.Context, accountID string, userID string) (*types.Account, error)
|
||||
@@ -345,9 +345,9 @@ func (am *MockAccountManager) GetAccountIDByUserID(ctx context.Context, userAuth
|
||||
}
|
||||
|
||||
// MarkPeerConnected mock implementation of MarkPeerConnected from server.AccountManager interface
|
||||
func (am *MockAccountManager) MarkPeerConnected(ctx context.Context, peerKey string, accountID string, sessionStartedAt int64, nmap *types.NetworkMap) error {
|
||||
func (am *MockAccountManager) MarkPeerConnected(ctx context.Context, peerKey string, realIP net.IP, accountID string, sessionStartedAt int64, nmap *types.NetworkMap) error {
|
||||
if am.MarkPeerConnectedFunc != nil {
|
||||
return am.MarkPeerConnectedFunc(ctx, peerKey, accountID, sessionStartedAt, nmap)
|
||||
return am.MarkPeerConnectedFunc(ctx, peerKey, realIP, accountID, sessionStartedAt, nmap)
|
||||
}
|
||||
return status.Errorf(codes.Unimplemented, "method MarkPeerConnected is not implemented")
|
||||
}
|
||||
@@ -975,9 +975,9 @@ func (am *MockAccountManager) GroupValidation(ctx context.Context, accountId str
|
||||
}
|
||||
|
||||
// SyncPeerMeta mocks SyncPeerMeta of the AccountManager interface
|
||||
func (am *MockAccountManager) SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) error {
|
||||
func (am *MockAccountManager) SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta) error {
|
||||
if am.SyncPeerMetaFunc != nil {
|
||||
return am.SyncPeerMetaFunc(ctx, peerPubKey, meta, realIP)
|
||||
return am.SyncPeerMetaFunc(ctx, peerPubKey, meta)
|
||||
}
|
||||
return status.Errorf(codes.Unimplemented, "method SyncPeerMeta is not implemented")
|
||||
}
|
||||
|
||||
@@ -74,7 +74,7 @@ func (am *DefaultAccountManager) GetPeers(ctx context.Context, accountID, userID
|
||||
//
|
||||
// Disconnects use MarkPeerDisconnected and require the session to match
|
||||
// exactly; see PeerStatus.SessionStartedAt for the protocol.
|
||||
func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, accountID string, sessionStartedAt int64, nmap *types.NetworkMap) error {
|
||||
func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, realIP net.IP, accountID string, sessionStartedAt int64, nmap *types.NetworkMap) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
am.metrics.AccountManagerMetrics().RecordPeerStatusUpdateDuration(telemetry.PeerStatusConnect, time.Since(start))
|
||||
@@ -102,6 +102,10 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
|
||||
}
|
||||
am.metrics.AccountManagerMetrics().CountPeerStatusUpdate(telemetry.PeerStatusConnect, telemetry.PeerStatusApplied)
|
||||
|
||||
if am.geo != nil && realIP != nil {
|
||||
am.updatePeerLocationIfChanged(ctx, accountID, peer, realIP)
|
||||
}
|
||||
|
||||
if err = am.schedulePeerExpirations(ctx, accountID, peer); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -188,40 +192,27 @@ func (am *DefaultAccountManager) MarkPeerDisconnected(ctx context.Context, peerP
|
||||
}
|
||||
}
|
||||
|
||||
if peer.AddedWithSSOLogin() && peer.InactivityExpirationEnabled {
|
||||
settings, err := am.Store.GetAccountSettings(ctx, store.LockingStrengthNone, accountID)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed getting account settings to schedule inactivity expiration for peer %s: %v", peer.ID, err)
|
||||
} else if settings.PeerInactivityExpirationEnabled {
|
||||
am.checkAndSchedulePeerInactivityExpiration(ctx, accountID)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// resolvePeerLocation looks up the geo location for realIP, returning nil when
|
||||
// there is nothing to apply: geo disabled, no real IP, the IP is unchanged from
|
||||
// what the peer already has, or the lookup failed. Geo lookups are skipped on
|
||||
// same-IP reconnects since they are comparatively expensive. The returned value
|
||||
// is applied by Peer.UpdateMetaIfNew so the change is persisted by its peer save.
|
||||
func (am *DefaultAccountManager) resolvePeerLocation(ctx context.Context, peer *nbpeer.Peer, realIP net.IP) *nbpeer.Location {
|
||||
if am.geo == nil || realIP == nil {
|
||||
return nil
|
||||
}
|
||||
// updatePeerLocationIfChanged refreshes the geolocation on a separate
|
||||
// row update, only when the connection IP actually changed. Geo lookups
|
||||
// are expensive so we skip same-IP reconnects.
|
||||
func (am *DefaultAccountManager) updatePeerLocationIfChanged(ctx context.Context, accountID string, peer *nbpeer.Peer, realIP net.IP) {
|
||||
if peer.Location.ConnectionIP != nil && peer.Location.ConnectionIP.Equal(realIP) {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
location, err := am.geo.Lookup(realIP)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed to get location for peer %s realip: [%s]: %v", peer.ID, realIP.String(), err)
|
||||
return nil
|
||||
return
|
||||
}
|
||||
return &nbpeer.Location{
|
||||
ConnectionIP: realIP,
|
||||
CountryCode: location.Country.ISOCode,
|
||||
CityName: location.City.Names.En,
|
||||
GeoNameID: location.City.GeonameID,
|
||||
peer.Location.ConnectionIP = realIP
|
||||
peer.Location.CountryCode = location.Country.ISOCode
|
||||
peer.Location.CityName = location.City.Names.En
|
||||
peer.Location.GeoNameID = location.City.GeonameID
|
||||
if err := am.Store.SavePeerLocation(ctx, accountID, peer); err != nil {
|
||||
log.WithContext(ctx).Warnf("could not store location for peer %s: %s", peer.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -989,8 +980,7 @@ func getPeerIPDNSLabel(ip netip.Addr, peerHostName string) (string, error) {
|
||||
// SyncPeer checks whether peer is eligible for receiving NetworkMap (authenticated) and returns its NetworkMap if eligible
|
||||
func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSync, accountID string) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) {
|
||||
var peer *nbpeer.Peer
|
||||
var ipv6CapabilityChanged bool
|
||||
var metaDiff nbpeer.MetaDiff
|
||||
var updated, versionChanged, ipv6CapabilityChanged bool
|
||||
var err error
|
||||
|
||||
settings, err := am.Store.GetAccountSettings(ctx, store.LockingStrengthNone, accountID)
|
||||
@@ -1020,10 +1010,9 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
|
||||
}
|
||||
|
||||
oldHasIPv6Cap := peer.HasCapability(nbpeer.PeerCapabilityIPv6Overlay)
|
||||
newLocation := am.resolvePeerLocation(ctx, peer, sync.RealIP)
|
||||
metaDiff = peer.UpdateMetaIfNew(ctx, sync.Meta, newLocation)
|
||||
updated, versionChanged = peer.UpdateMetaIfNew(ctx, sync.Meta)
|
||||
ipv6CapabilityChanged = oldHasIPv6Cap != peer.HasCapability(nbpeer.PeerCapabilityIPv6Overlay)
|
||||
if metaDiff.Updated() {
|
||||
if updated {
|
||||
am.metrics.AccountManagerMetrics().CountPeerMetUpdate()
|
||||
log.WithContext(ctx).Tracef("peer %s metadata updated", peer.ID)
|
||||
if err = transaction.SavePeer(ctx, accountID, peer); err != nil {
|
||||
@@ -1051,10 +1040,9 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
|
||||
return nil, nil, nil, 0, err
|
||||
}
|
||||
|
||||
metaDiffAffectsPosture := posture.AffectsPosture(&metaDiff, resPostureChecks)
|
||||
if isStatusChanged || sync.UpdateAccountPeers || ipv6CapabilityChanged || metaDiffAffectsPosture || metaDiff.VersionChanged || metaDiff.Hostname {
|
||||
if isStatusChanged || sync.UpdateAccountPeers || ipv6CapabilityChanged || (updated && (len(resPostureChecks) > 0 || versionChanged)) {
|
||||
changedPeerIDs := []string{peer.ID}
|
||||
affectedPeerIDs := am.syncPeerAffectedPeers(ctx, accountID, peer.ID, nmap, peerNotValid, metaDiffAffectsPosture)
|
||||
affectedPeerIDs := am.syncPeerAffectedPeers(ctx, accountID, peer.ID, nmap, peerNotValid, updated, len(resPostureChecks) > 0)
|
||||
if err = am.networkMapController.OnPeersUpdated(ctx, accountID, changedPeerIDs, affectedPeerIDs); err != nil {
|
||||
return nil, nil, nil, 0, fmt.Errorf("notify network map controller of peer update: %w", err)
|
||||
}
|
||||
@@ -1071,8 +1059,8 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
|
||||
// metadata change that flips a posture result removes this peer from others'
|
||||
// maps asymmetrically; that case (and an invalid peer, whose map is empty) falls
|
||||
// back to the resolver.
|
||||
func (am *DefaultAccountManager) syncPeerAffectedPeers(ctx context.Context, accountID, peerID string, nmap *types.NetworkMap, peerNotValid, metaChangeAffectedPosture bool) []string {
|
||||
if peerNotValid || metaChangeAffectedPosture {
|
||||
func (am *DefaultAccountManager) syncPeerAffectedPeers(ctx context.Context, accountID, peerID string, nmap *types.NetworkMap, peerNotValid, metaUpdated, hasPostureChecks bool) []string {
|
||||
if peerNotValid || (metaUpdated && hasPostureChecks) {
|
||||
return am.resolveAffectedPeersForPeerChanges(ctx, am.Store, accountID, []string{peerID})
|
||||
}
|
||||
return affectedPeerIDsFromNetworkMap(nmap, peerID)
|
||||
@@ -1182,7 +1170,7 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
|
||||
}
|
||||
|
||||
// This is needed to keep in memory for the peer config. Otherwise browser client will end in a retry loop
|
||||
peer.Meta = login.Meta
|
||||
peer.UpdateMetaIfNew(ctx, login.Meta)
|
||||
|
||||
peerGroupIDs, err = getPeerGroupIDs(ctx, am.Store, accountID, peer.ID)
|
||||
if err != nil {
|
||||
|
||||
@@ -256,18 +256,14 @@ func (p *Peer) Copy() *Peer {
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateMetaIfNew updates peer's system metadata and connection geo location if
|
||||
// new information is provided. newLocation is the geo location resolved from the
|
||||
// peer's current connection IP, or nil when there is nothing to apply (geo
|
||||
// disabled, no real IP, or the IP is unchanged); the caller owns the expensive
|
||||
// lookup and the same-IP guard. It returns a MetaDiff describing what changed;
|
||||
// diff.Updated() reports whether the peer needs to be persisted.
|
||||
func (p *Peer) UpdateMetaIfNew(ctx context.Context, meta PeerSystemMeta, newLocation *Location) MetaDiff {
|
||||
// UpdateMetaIfNew updates peer's system metadata if new information is provided
|
||||
// returns true if meta was updated, false otherwise
|
||||
func (p *Peer) UpdateMetaIfNew(ctx context.Context, meta PeerSystemMeta) (updated, versionChanged bool) {
|
||||
if meta.isEmpty() {
|
||||
return MetaDiff{}
|
||||
return updated, versionChanged
|
||||
}
|
||||
|
||||
versionChanged := p.Meta.WtVersion != meta.WtVersion
|
||||
versionChanged = p.Meta.WtVersion != meta.WtVersion
|
||||
|
||||
// Avoid overwriting UIVersion if the update was triggered sole by the CLI client
|
||||
if meta.UIVersion == "" {
|
||||
@@ -276,177 +272,97 @@ func (p *Peer) UpdateMetaIfNew(ctx context.Context, meta PeerSystemMeta, newLoca
|
||||
|
||||
oldVersion := p.Meta.WtVersion
|
||||
|
||||
diff := diffMeta(p.Meta, meta)
|
||||
if diff.Any() {
|
||||
diff := metaDiff(p.Meta, meta)
|
||||
if len(diff) != 0 {
|
||||
p.Meta = meta
|
||||
}
|
||||
diff.VersionChanged = versionChanged
|
||||
|
||||
locationInfo := ""
|
||||
if newLocation != nil {
|
||||
p.Location = *newLocation
|
||||
diff.LocationChanged = true
|
||||
locationInfo = fmt.Sprintf("location changed to %s, ", newLocation.ConnectionIP)
|
||||
updated = true
|
||||
}
|
||||
|
||||
versionInfo := ""
|
||||
if diff.VersionChanged {
|
||||
if versionChanged {
|
||||
versionInfo = fmt.Sprintf("version changed: %s -> %s, ", oldVersion, meta.WtVersion)
|
||||
}
|
||||
|
||||
if diff.Any() || diff.VersionChanged || diff.LocationChanged {
|
||||
if len(diff) > 0 || versionChanged {
|
||||
log.WithContext(ctx).
|
||||
Debugf("peer meta updated, %s%s%d field(s) changed: %s", versionInfo, locationInfo, len(diff.Changed), strings.Join(diff.Changed, ", "))
|
||||
Debugf("peer meta updated, %s%d field(s) changed: %s", versionInfo, len(diff), strings.Join(diff, ", "))
|
||||
}
|
||||
|
||||
return diff
|
||||
}
|
||||
|
||||
// MetaDiff records which PeerSystemMeta fields differ between two metas. Each bool
|
||||
// maps to a single struct field, except Environment, which is split into Cloud and
|
||||
// Platform. Changed holds the human-readable `field: <old> -> <new>` entries so the
|
||||
// existing log line and isEqual can be derived from the same comparison.
|
||||
//
|
||||
// VersionChanged and LocationChanged sit outside the per-meta-field set:
|
||||
// VersionChanged tracks the WireGuard client version specifically (compared before
|
||||
// the UIVersion fixup, to signal client upgrades) and LocationChanged tracks the
|
||||
// peer's connection geo location, which lives on Peer rather than PeerSystemMeta.
|
||||
// Neither contributes an entry to Changed, so the field-coverage accounting stays
|
||||
// driven purely by the PeerSystemMeta comparison.
|
||||
type MetaDiff struct {
|
||||
Hostname bool
|
||||
GoOS bool
|
||||
Kernel bool
|
||||
KernelVersion bool
|
||||
Core bool
|
||||
Platform bool
|
||||
OS bool
|
||||
OSVersion bool
|
||||
WtVersion bool
|
||||
UIVersion bool
|
||||
SystemSerialNumber bool
|
||||
SystemProductName bool
|
||||
SystemManufacturer bool
|
||||
EnvironmentCloud bool
|
||||
EnvironmentPlatform bool
|
||||
Flags bool
|
||||
Capabilities bool
|
||||
NetworkAddresses bool
|
||||
Files bool
|
||||
|
||||
VersionChanged bool
|
||||
LocationChanged bool
|
||||
|
||||
Changed []string
|
||||
}
|
||||
|
||||
// Any reports whether any PeerSystemMeta field changed.
|
||||
func (d MetaDiff) Any() bool {
|
||||
return len(d.Changed) != 0
|
||||
}
|
||||
|
||||
// Updated reports whether the peer needs to be persisted: any meta field changed
|
||||
// or the geo location changed. The version flag alone does not imply a write,
|
||||
// since a version change is also reflected in the WtVersion meta field.
|
||||
func (d MetaDiff) Updated() bool {
|
||||
return d.Any() || d.LocationChanged || d.VersionChanged
|
||||
return updated, versionChanged
|
||||
}
|
||||
|
||||
// metaDiff returns a human-readable list of the fields that differ between the
|
||||
// old and new meta, each formatted as `field: <old> -> <new>`. It is the single
|
||||
// source of truth for meta comparison: isEqual reports equality as an empty
|
||||
// diff, so the log line can never disagree with the change decision. Slices are
|
||||
// cloned before sorting, so callers' meta is not mutated.
|
||||
func metaDiff(oldMeta, newMeta PeerSystemMeta) []string {
|
||||
return diffMeta(oldMeta, newMeta).Changed
|
||||
}
|
||||
|
||||
// diffMeta compares two metas field by field, returning both a per-field flag set
|
||||
// (for callers that need to know exactly what changed, e.g. matching against
|
||||
// posture checks) and the human-readable Changed list. It is the single source of
|
||||
// truth for meta comparison: isEqual reports equality as an empty diff, so the log
|
||||
// line, the change decision, and the flags can never disagree.
|
||||
func diffMeta(oldMeta, newMeta PeerSystemMeta) MetaDiff {
|
||||
var d MetaDiff
|
||||
var diff []string
|
||||
add := func(field string, oldVal, newVal any) {
|
||||
d.Changed = append(d.Changed, fmt.Sprintf("%s: %v -> %v", field, oldVal, newVal))
|
||||
diff = append(diff, fmt.Sprintf("%s: %v -> %v", field, oldVal, newVal))
|
||||
}
|
||||
|
||||
if oldMeta.Hostname != newMeta.Hostname {
|
||||
d.Hostname = true
|
||||
add("hostname", oldMeta.Hostname, newMeta.Hostname)
|
||||
}
|
||||
if oldMeta.GoOS != newMeta.GoOS {
|
||||
d.GoOS = true
|
||||
add("goos", oldMeta.GoOS, newMeta.GoOS)
|
||||
}
|
||||
if oldMeta.Kernel != newMeta.Kernel {
|
||||
d.Kernel = true
|
||||
add("kernel", oldMeta.Kernel, newMeta.Kernel)
|
||||
}
|
||||
if oldMeta.KernelVersion != newMeta.KernelVersion {
|
||||
d.KernelVersion = true
|
||||
add("kernel_version", oldMeta.KernelVersion, newMeta.KernelVersion)
|
||||
}
|
||||
if oldMeta.Core != newMeta.Core {
|
||||
d.Core = true
|
||||
add("core", oldMeta.Core, newMeta.Core)
|
||||
}
|
||||
if oldMeta.Platform != newMeta.Platform {
|
||||
d.Platform = true
|
||||
add("platform", oldMeta.Platform, newMeta.Platform)
|
||||
}
|
||||
if oldMeta.OS != newMeta.OS {
|
||||
d.OS = true
|
||||
add("os", oldMeta.OS, newMeta.OS)
|
||||
}
|
||||
if oldMeta.OSVersion != newMeta.OSVersion {
|
||||
d.OSVersion = true
|
||||
add("os_version", oldMeta.OSVersion, newMeta.OSVersion)
|
||||
}
|
||||
if oldMeta.WtVersion != newMeta.WtVersion {
|
||||
d.WtVersion = true
|
||||
add("wt_version", oldMeta.WtVersion, newMeta.WtVersion)
|
||||
}
|
||||
if oldMeta.UIVersion != newMeta.UIVersion {
|
||||
d.UIVersion = true
|
||||
add("ui_version", oldMeta.UIVersion, newMeta.UIVersion)
|
||||
}
|
||||
if oldMeta.SystemSerialNumber != newMeta.SystemSerialNumber {
|
||||
d.SystemSerialNumber = true
|
||||
add("system_serial_number", oldMeta.SystemSerialNumber, newMeta.SystemSerialNumber)
|
||||
}
|
||||
if oldMeta.SystemProductName != newMeta.SystemProductName {
|
||||
d.SystemProductName = true
|
||||
add("system_product_name", oldMeta.SystemProductName, newMeta.SystemProductName)
|
||||
}
|
||||
if oldMeta.SystemManufacturer != newMeta.SystemManufacturer {
|
||||
d.SystemManufacturer = true
|
||||
add("system_manufacturer", oldMeta.SystemManufacturer, newMeta.SystemManufacturer)
|
||||
}
|
||||
if oldMeta.Environment.Cloud != newMeta.Environment.Cloud {
|
||||
d.EnvironmentCloud = true
|
||||
add("environment_cloud", oldMeta.Environment.Cloud, newMeta.Environment.Cloud)
|
||||
}
|
||||
if oldMeta.Environment.Platform != newMeta.Environment.Platform {
|
||||
d.EnvironmentPlatform = true
|
||||
add("environment_platform", oldMeta.Environment.Platform, newMeta.Environment.Platform)
|
||||
}
|
||||
if !oldMeta.Flags.isEqual(newMeta.Flags) {
|
||||
d.Flags = true
|
||||
add("flags", fmt.Sprintf("%+v", oldMeta.Flags), fmt.Sprintf("%+v", newMeta.Flags))
|
||||
}
|
||||
if !capabilitiesEqual(oldMeta.Capabilities, newMeta.Capabilities) {
|
||||
d.Capabilities = true
|
||||
add("capabilities", oldMeta.Capabilities, newMeta.Capabilities)
|
||||
}
|
||||
|
||||
if !sameMultiset(oldMeta.NetworkAddresses, newMeta.NetworkAddresses) {
|
||||
d.NetworkAddresses = true
|
||||
add("network_addresses", fmt.Sprintf("%v", oldMeta.NetworkAddresses), fmt.Sprintf("%v", newMeta.NetworkAddresses))
|
||||
}
|
||||
|
||||
if !sameMultiset(oldMeta.Files, newMeta.Files) {
|
||||
d.Files = true
|
||||
add("files", fmt.Sprintf("%v", oldMeta.Files), fmt.Sprintf("%v", newMeta.Files))
|
||||
}
|
||||
|
||||
return d
|
||||
return diff
|
||||
}
|
||||
|
||||
// sameMultiset reports whether two slices contain the same elements with the
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"regexp"
|
||||
|
||||
"github.com/hashicorp/go-version"
|
||||
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
"github.com/netbirdio/netbird/shared/management/http/api"
|
||||
"github.com/netbirdio/netbird/shared/management/status"
|
||||
@@ -52,34 +51,6 @@ type Checks struct {
|
||||
Checks ChecksDefinition `gorm:"serializer:json"`
|
||||
}
|
||||
|
||||
// AffectsPosture reports whether the peer metadata changes described by diff can
|
||||
// alter the outcome of any of the given posture checks. It maps each check kind to
|
||||
// the metadata fields it inspects, so an unrelated change (e.g. a hostname update)
|
||||
// does not force a posture re-evaluation.
|
||||
func AffectsPosture(diff *nbpeer.MetaDiff, checks []*Checks) bool {
|
||||
if diff == nil {
|
||||
return false
|
||||
}
|
||||
for _, c := range checks {
|
||||
if c.Checks.ProcessCheck != nil && diff.Files {
|
||||
return true
|
||||
}
|
||||
if c.Checks.OSVersionCheck != nil && (diff.OSVersion || diff.OS || diff.KernelVersion) {
|
||||
return true
|
||||
}
|
||||
if c.Checks.NBVersionCheck != nil && diff.WtVersion {
|
||||
return true
|
||||
}
|
||||
if c.Checks.GeoLocationCheck != nil && diff.LocationChanged {
|
||||
return true
|
||||
}
|
||||
if c.Checks.PeerNetworkRangeCheck != nil && diff.NetworkAddresses {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// ChecksDefinition contains definition of actual check
|
||||
type ChecksDefinition struct {
|
||||
NBVersionCheck *NBVersionCheck `json:",omitempty"`
|
||||
|
||||
@@ -581,6 +581,28 @@ func (s *SqlStore) MarkPeerDisconnectedIfSameSession(ctx context.Context, accoun
|
||||
return result.RowsAffected > 0, nil
|
||||
}
|
||||
|
||||
func (s *SqlStore) SavePeerLocation(ctx context.Context, accountID string, peerWithLocation *nbpeer.Peer) error {
|
||||
// To maintain data integrity, we create a copy of the peer's location to prevent unintended updates to other fields.
|
||||
var peerCopy nbpeer.Peer
|
||||
// Since the location field has been migrated to JSON serialization,
|
||||
// updating the struct ensures the correct data format is inserted into the database.
|
||||
peerCopy.Location = peerWithLocation.Location
|
||||
|
||||
result := s.db.Model(&nbpeer.Peer{}).
|
||||
Where(accountAndIDQueryCondition, accountID, peerWithLocation.ID).
|
||||
Updates(peerCopy)
|
||||
|
||||
if result.Error != nil {
|
||||
return status.Errorf(status.Internal, "failed to save peer locations to store: %v", result.Error)
|
||||
}
|
||||
|
||||
if result.RowsAffected == 0 {
|
||||
return status.Errorf(status.NotFound, peerNotFoundFMT, peerWithLocation.ID)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ApproveAccountPeers marks all peers that currently require approval in the given account as approved.
|
||||
func (s *SqlStore) ApproveAccountPeers(ctx context.Context, accountID string) (int, error) {
|
||||
result := s.db.Model(&nbpeer.Peer{}).
|
||||
|
||||
@@ -618,6 +618,56 @@ func TestSqlStore_SavePeerStatus(t *testing.T) {
|
||||
assert.WithinDurationf(t, newStatus.LastSeen, actual.LastSeen.UTC(), time.Millisecond, "LastSeen should be equal")
|
||||
}
|
||||
|
||||
func TestSqlStore_SavePeerLocation(t *testing.T) {
|
||||
store, cleanUp, err := NewTestStoreFromSQL(context.Background(), "../testdata/store.sql", t.TempDir())
|
||||
t.Cleanup(cleanUp)
|
||||
assert.NoError(t, err)
|
||||
|
||||
account, err := store.GetAccount(context.Background(), "bf1c8084-ba50-4ce7-9439-34653001fc3b")
|
||||
require.NoError(t, err)
|
||||
|
||||
peer := &nbpeer.Peer{
|
||||
AccountID: account.Id,
|
||||
ID: "testpeer",
|
||||
Location: nbpeer.Location{
|
||||
ConnectionIP: net.ParseIP("0.0.0.0"),
|
||||
CountryCode: "YY",
|
||||
CityName: "City",
|
||||
GeoNameID: 1,
|
||||
},
|
||||
CreatedAt: time.Now().UTC(),
|
||||
Meta: nbpeer.PeerSystemMeta{},
|
||||
}
|
||||
// error is expected as peer is not in store yet
|
||||
err = store.SavePeerLocation(context.Background(), account.Id, peer)
|
||||
assert.Error(t, err)
|
||||
|
||||
account.Peers[peer.ID] = peer
|
||||
err = store.SaveAccount(context.Background(), account)
|
||||
require.NoError(t, err)
|
||||
|
||||
peer.Location.ConnectionIP = net.ParseIP("35.1.1.1")
|
||||
peer.Location.CountryCode = "DE"
|
||||
peer.Location.CityName = "Berlin"
|
||||
peer.Location.GeoNameID = 2950159
|
||||
|
||||
err = store.SavePeerLocation(context.Background(), account.Id, account.Peers[peer.ID])
|
||||
assert.NoError(t, err)
|
||||
|
||||
account, err = store.GetAccount(context.Background(), account.Id)
|
||||
require.NoError(t, err)
|
||||
|
||||
actual := account.Peers[peer.ID].Location
|
||||
assert.Equal(t, peer.Location, actual)
|
||||
|
||||
peer.ID = "non-existing-peer"
|
||||
err = store.SavePeerLocation(context.Background(), account.Id, peer)
|
||||
assert.Error(t, err)
|
||||
parsedErr, ok := status.FromError(err)
|
||||
require.True(t, ok)
|
||||
require.Equal(t, status.NotFound, parsedErr.Type(), "should return not found error")
|
||||
}
|
||||
|
||||
func Test_TestGetAccountByPrivateDomain(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("The SQLite store is not properly supported by Windows yet")
|
||||
|
||||
@@ -185,6 +185,7 @@ type Store interface {
|
||||
// recorded by the database. Returns true when the update happened,
|
||||
// false when a newer session has taken over.
|
||||
MarkPeerDisconnectedIfSameSession(ctx context.Context, accountID, peerID string, sessionStartedAt int64) (bool, error)
|
||||
SavePeerLocation(ctx context.Context, accountID string, peer *nbpeer.Peer) error
|
||||
ApproveAccountPeers(ctx context.Context, accountID string) (int, error)
|
||||
DeletePeer(ctx context.Context, accountID string, peerID string) error
|
||||
|
||||
|
||||
@@ -2968,6 +2968,20 @@ func (mr *MockStoreMockRecorder) SavePeer(ctx, accountID, peer interface{}) *gom
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SavePeer", reflect.TypeOf((*MockStore)(nil).SavePeer), ctx, accountID, peer)
|
||||
}
|
||||
|
||||
// SavePeerLocation mocks base method.
|
||||
func (m *MockStore) SavePeerLocation(ctx context.Context, accountID string, peer *peer.Peer) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SavePeerLocation", ctx, accountID, peer)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SavePeerLocation indicates an expected call of SavePeerLocation.
|
||||
func (mr *MockStoreMockRecorder) SavePeerLocation(ctx, accountID, peer interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SavePeerLocation", reflect.TypeOf((*MockStore)(nil).SavePeerLocation), ctx, accountID, peer)
|
||||
}
|
||||
|
||||
// SavePeerStatus mocks base method.
|
||||
func (m *MockStore) SavePeerStatus(ctx context.Context, accountID, peerID string, status peer.PeerStatus) error {
|
||||
m.ctrl.T.Helper()
|
||||
|
||||
@@ -12,9 +12,6 @@ type PeerSync struct {
|
||||
WireGuardPubKey string
|
||||
// Meta is the system information passed by peer, must be always present
|
||||
Meta nbpeer.PeerSystemMeta
|
||||
// RealIP is the peer's connection IP, used to refresh its geo location.
|
||||
// May be nil when the request has no associated connection IP.
|
||||
RealIP net.IP
|
||||
// UpdateAccountPeers indicate updating account peers,
|
||||
// which occurs when the peer's metadata is updated
|
||||
UpdateAccountPeers bool
|
||||
|
||||
@@ -1847,12 +1847,17 @@ func (am *DefaultAccountManager) DeleteUserInvite(ctx context.Context, accountID
|
||||
|
||||
const minPasswordLength = 8
|
||||
|
||||
// validatePassword checks password strength requirements:
|
||||
// validatePassword checks password strength requirements.
|
||||
func validatePassword(password string) error {
|
||||
return ValidatePassword(password)
|
||||
}
|
||||
|
||||
// ValidatePassword checks password strength requirements:
|
||||
// - Minimum 8 characters
|
||||
// - At least 1 digit
|
||||
// - At least 1 uppercase letter
|
||||
// - At least 1 special character
|
||||
func validatePassword(password string) error {
|
||||
func ValidatePassword(password string) error {
|
||||
if len(password) < minPasswordLength {
|
||||
return errors.New("password must be at least 8 characters long")
|
||||
}
|
||||
|
||||
@@ -243,11 +243,7 @@ func NewClientWithServerIP(serverURL string, serverIP netip.Addr, authTokenStore
|
||||
|
||||
// Connect establishes a connection to the relay server. It blocks until the connection is established or an error occurs.
|
||||
func (c *Client) Connect(ctx context.Context) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
c.log.Infof("connect elapsed time: %v", time.Since(start))
|
||||
}()
|
||||
|
||||
c.log.Infof("connecting to relay server")
|
||||
c.readLoopMutex.Lock()
|
||||
defer c.readLoopMutex.Unlock()
|
||||
|
||||
@@ -291,11 +287,6 @@ func (c *Client) Connect(ctx context.Context) error {
|
||||
func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, error) {
|
||||
peerID := messages.HashID(dstPeerID)
|
||||
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
c.log.Infof("connect elapsed time: %v", time.Since(start))
|
||||
}()
|
||||
|
||||
c.mu.Lock()
|
||||
if !c.serviceIsRunning {
|
||||
c.mu.Unlock()
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"go.opentelemetry.io/otel"
|
||||
|
||||
"github.com/netbirdio/netbird/client/iface"
|
||||
@@ -253,7 +252,7 @@ func TestClient_ConnectedIPParsesRemoteAddr(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := &Client{log: log.WithField("relay", tt.name), relayConn: stubConn{remote: staticAddr{s: tt.s}}}
|
||||
c := &Client{relayConn: stubConn{remote: staticAddr{s: tt.s}}}
|
||||
got := c.ConnectedIP()
|
||||
var gotStr string
|
||||
if got.IsValid() {
|
||||
|
||||
@@ -78,23 +78,6 @@ type GrpcClient struct {
|
||||
// transport-alive but no longer delivering messages. It is the source of
|
||||
// truth IsHealthy reads, and is cleared once any frame is received again.
|
||||
receiveStalled atomic.Bool
|
||||
// receiveHandoffBlocked is set while the receive loop is parked handing a
|
||||
// message to a busy decryption worker. The loop stops calling Recv (and
|
||||
// markReceived) in that window, so the stream looks silent though it is
|
||||
// healthy. The watchdog reads this to avoid misreading self-inflicted
|
||||
// receive backpressure as a dead stream: reconnecting cannot help, since the
|
||||
// new stream feeds the same worker, and only triggers a reconnect storm.
|
||||
receiveHandoffBlocked atomic.Bool
|
||||
// lastDecrypt holds the Unix-nano timestamp of the last message the decryption
|
||||
// worker pulled off its queue. Diagnostic only: it lets a stall log show
|
||||
// whether the worker was draining (busy) or idle when the stream went silent.
|
||||
lastDecrypt atomic.Int64
|
||||
// handoffWaitTotal, handoffWaitMax (nanos) and handoffWaitCount accumulate the
|
||||
// time the receive loop spent blocked handing messages to the worker. This is
|
||||
// time not spent reading the stream, so it quantifies receive backpressure.
|
||||
handoffWaitTotal atomic.Int64
|
||||
handoffWaitMax atomic.Int64
|
||||
handoffWaitCount atomic.Int64
|
||||
}
|
||||
|
||||
// NewClient creates a new Signal client
|
||||
@@ -370,8 +353,6 @@ func (c *GrpcClient) SendToStream(msg *proto.EncryptedMessage) error {
|
||||
|
||||
// decryptMessage decrypts the body of the msg using Wireguard private key and Remote peer's public key
|
||||
func (c *GrpcClient) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, error) {
|
||||
c.lastDecrypt.Store(time.Now().UnixNano())
|
||||
|
||||
remoteKey, err := wgtypes.ParseKey(msg.GetKey())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -458,22 +439,6 @@ func (c *GrpcClient) idleSinceReceive() time.Duration {
|
||||
return time.Since(time.Unix(0, c.lastReceived.Load()))
|
||||
}
|
||||
|
||||
// idleSinceDecrypt returns how long since the worker last pulled a message.
|
||||
// Diagnostic only: distinguishes a busy/wedged worker from an idle one.
|
||||
func (c *GrpcClient) idleSinceDecrypt() time.Duration {
|
||||
return time.Since(time.Unix(0, c.lastDecrypt.Load()))
|
||||
}
|
||||
|
||||
// receiveAlive reports whether the receive stream shows liveness: it delivered a
|
||||
// frame within the inactivity threshold, or the receive loop is currently parked
|
||||
// handing a message to a busy decryption worker. In the latter case the loop has
|
||||
// stopped calling Recv, so the stream looks silent while being healthy, and
|
||||
// reconnecting would not help, so the watchdog must treat it as alive.
|
||||
func (c *GrpcClient) receiveAlive() bool {
|
||||
return c.idleSinceReceive() < receiveInactivityThreshold ||
|
||||
c.receiveHandoffBlocked.Load()
|
||||
}
|
||||
|
||||
// watchReceiveStream guards against a receive stream that is transport-alive but
|
||||
// no longer delivering messages. While the stream is idle past
|
||||
// receiveInactivityThreshold it sends a self-addressed probe that the Signal
|
||||
@@ -485,55 +450,18 @@ func (c *GrpcClient) watchReceiveStream(ctx context.Context, cancelStream contex
|
||||
defer ticker.Stop()
|
||||
|
||||
var probeSentAt time.Time
|
||||
var holdLogged bool
|
||||
var statTicks int
|
||||
var lastStatTotal int64
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
// Periodic backpressure summary so time lost to the worker handoff is
|
||||
// visible even when no stall fires. Emitted ~once a minute and only
|
||||
// when the wait grew, to stay quiet on a healthy stream.
|
||||
if statTicks++; statTicks >= int(time.Minute/receiveWatchdogInterval) {
|
||||
statTicks = 0
|
||||
if total, max, count := c.handoffWaitStats(); int64(total) > lastStatTotal {
|
||||
log.Infof("signal receive backpressure: handoffWaitTotal=%s (+%s last min) handoffWaitMax=%s handoffMsgs=%d",
|
||||
total.Round(time.Second), (total - time.Duration(lastStatTotal)).Round(time.Millisecond),
|
||||
max.Round(time.Millisecond), count)
|
||||
lastStatTotal = int64(total)
|
||||
}
|
||||
}
|
||||
|
||||
if c.receiveAlive() {
|
||||
// Attribute the case that matters in the field: silent past the
|
||||
// threshold but held because the receive loop is parked on the
|
||||
// worker handoff (backpressure), not a dead stream. Log once per
|
||||
// hold episode so a persistent worker stall is visible at info.
|
||||
if c.idleSinceReceive() >= receiveInactivityThreshold && c.receiveHandoffBlocked.Load() {
|
||||
if !holdLogged {
|
||||
total, max, count := c.handoffWaitStats()
|
||||
log.Infof("signal receive idle %s, loop blocked on worker handoff (idleDecrypt=%s queueDepth=%d connState=%s handoffWaitTotal=%s handoffWaitMax=%s handoffMsgs=%d); holding stream",
|
||||
c.idleSinceReceive().Round(time.Second), c.idleSinceDecrypt().Round(time.Second),
|
||||
c.decryptionWorker.QueueLen(), c.signalConn.GetState(),
|
||||
total.Round(time.Second), max.Round(time.Millisecond), count)
|
||||
holdLogged = true
|
||||
}
|
||||
} else {
|
||||
holdLogged = false
|
||||
}
|
||||
if c.idleSinceReceive() < receiveInactivityThreshold {
|
||||
probeSentAt = time.Time{}
|
||||
continue
|
||||
}
|
||||
holdLogged = false
|
||||
|
||||
if !probeSentAt.IsZero() && time.Since(probeSentAt) >= receiveProbeTimeout {
|
||||
total, max, count := c.handoffWaitStats()
|
||||
log.Warnf("signal receive stream stalled, reconnecting: idleRecv=%s idleDecrypt=%s handoffBlocked=%v queueDepth=%d connState=%s handoffWaitTotal=%s handoffWaitMax=%s handoffMsgs=%d probe did not return",
|
||||
c.idleSinceReceive().Round(time.Second), c.idleSinceDecrypt().Round(time.Second),
|
||||
c.receiveHandoffBlocked.Load(), c.decryptionWorker.QueueLen(), c.signalConn.GetState(),
|
||||
total.Round(time.Second), max.Round(time.Millisecond), count)
|
||||
log.Warnf("signal receive stream stalled: no messages for %s and probe did not return, reconnecting", c.idleSinceReceive().Round(time.Second))
|
||||
c.receiveStalled.Store(true)
|
||||
c.notifyDisconnected(errReceiveStreamStalled)
|
||||
cancelStream()
|
||||
@@ -589,37 +517,12 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) er
|
||||
continue
|
||||
}
|
||||
|
||||
// The handoff blocks while the worker is busy, which parks this loop and
|
||||
// stops Recv. Flag it so the watchdog does not read the resulting silence
|
||||
// as a dead stream, and account the wait as receive backpressure.
|
||||
handoffStart := time.Now()
|
||||
c.receiveHandoffBlocked.Store(true)
|
||||
if err := c.decryptionWorker.AddMsg(c.ctx, msg); err != nil {
|
||||
log.Errorf("failed to add message to decryption worker: %v", err)
|
||||
}
|
||||
c.receiveHandoffBlocked.Store(false)
|
||||
c.recordHandoffWait(time.Since(handoffStart))
|
||||
}
|
||||
}
|
||||
|
||||
// recordHandoffWait accumulates the time the receive loop was blocked handing a
|
||||
// message to the worker.
|
||||
func (c *GrpcClient) recordHandoffWait(d time.Duration) {
|
||||
c.handoffWaitTotal.Add(int64(d))
|
||||
c.handoffWaitCount.Add(1)
|
||||
for {
|
||||
cur := c.handoffWaitMax.Load()
|
||||
if int64(d) <= cur || c.handoffWaitMax.CompareAndSwap(cur, int64(d)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handoffWaitStats returns cumulative receive-loop handoff backpressure.
|
||||
func (c *GrpcClient) handoffWaitStats() (total, max time.Duration, count int64) {
|
||||
return time.Duration(c.handoffWaitTotal.Load()), time.Duration(c.handoffWaitMax.Load()), c.handoffWaitCount.Load()
|
||||
}
|
||||
|
||||
func (c *GrpcClient) startEncryptionWorker(handler func(msg *proto.Message) error) {
|
||||
if c.decryptionWorker != nil {
|
||||
return
|
||||
|
||||
@@ -82,27 +82,3 @@ func TestReceiveProbeRoundTrips(t *testing.T) {
|
||||
t.Fatal("self-addressed heartbeat did not round-trip back through the signal server")
|
||||
}
|
||||
}
|
||||
|
||||
// TestReceiveAliveTreatsHandoffBlockAsLiveness reproduces the false positive
|
||||
// where a busy decryption worker parks the receive loop on the worker handoff,
|
||||
// so Recv (and markReceived) stops firing even though the stream is healthy.
|
||||
// With the receive stream silent past the inactivity threshold but the loop
|
||||
// blocked on handoff, the watchdog must consider the stream alive rather than
|
||||
// tear it down (reconnecting feeds the same worker and would not help).
|
||||
func TestReceiveAliveTreatsHandoffBlockAsLiveness(t *testing.T) {
|
||||
c := &GrpcClient{}
|
||||
|
||||
// Receive stream silent and the loop not blocked on handoff: genuinely stalled.
|
||||
c.lastReceived.Store(time.Now().Add(-2 * receiveInactivityThreshold).UnixNano())
|
||||
require.False(t, c.receiveAlive(), "silent stream with the receive loop idle must be treated as stalled")
|
||||
|
||||
// Receive stream silent but the loop is parked handing a message to a busy
|
||||
// worker: self-inflicted backpressure, not a dead stream, must not tear down.
|
||||
c.receiveHandoffBlocked.Store(true)
|
||||
require.True(t, c.receiveAlive(), "a receive loop blocked on worker handoff must keep the stream alive")
|
||||
|
||||
// Handoff drained, loop back to reading, a frame just arrived: alive via the receive path.
|
||||
c.receiveHandoffBlocked.Store(false)
|
||||
c.markReceived()
|
||||
require.True(t, c.receiveAlive(), "a freshly received frame must keep the stream alive")
|
||||
}
|
||||
|
||||
@@ -32,13 +32,6 @@ func (w *Worker) AddMsg(ctx context.Context, msg *proto.EncryptedMessage) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// QueueLen returns the number of messages buffered for decryption. Diagnostic
|
||||
// only: a non-empty queue while the receive stream is silent indicates the
|
||||
// receive loop is parked on the handoff rather than the stream being dead.
|
||||
func (w *Worker) QueueLen() int {
|
||||
return len(w.encryptedMsgPool)
|
||||
}
|
||||
|
||||
func (w *Worker) Work(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
|
||||
Reference in New Issue
Block a user