mirror of
https://github.com/netbirdio/netbird.git
synced 2026-06-25 01:09:54 +00:00
Compare commits
7 Commits
client-jso
...
dmitri-fil
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d8e7f2e9e6 | ||
|
|
1205641b44 | ||
|
|
56e8215ebe | ||
|
|
9b768d1773 | ||
|
|
33954ea15e | ||
|
|
4c4434a871 | ||
|
|
7873f337df |
@@ -5,7 +5,6 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -23,21 +22,15 @@ var serviceCmd = &cobra.Command{
|
||||
Short: "Manage the NetBird daemon service",
|
||||
}
|
||||
|
||||
const defaultJSONSocket = "unix:///var/run/netbird-http.sock"
|
||||
|
||||
var (
|
||||
serviceName string
|
||||
serviceEnvVars []string
|
||||
jsonSocket string
|
||||
jsonSocketDisabled bool
|
||||
serviceName string
|
||||
serviceEnvVars []string
|
||||
)
|
||||
|
||||
type program struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
serv *grpc.Server
|
||||
jsonServ *http.Server
|
||||
jsonServMu sync.Mutex
|
||||
serverInstance *server.Server
|
||||
serverInstanceMu sync.Mutex
|
||||
}
|
||||
@@ -53,8 +46,6 @@ func init() {
|
||||
serviceCmd.PersistentFlags().BoolVar(&updateSettingsDisabled, "disable-update-settings", false, "Disables update settings feature. If enabled, the client will not be able to change or edit any settings. To persist this setting, use: netbird service install --disable-update-settings")
|
||||
serviceCmd.PersistentFlags().BoolVar(&captureEnabled, "enable-capture", false, "Enables packet capture via 'netbird debug capture'. To persist, use: netbird service install --enable-capture")
|
||||
serviceCmd.PersistentFlags().BoolVar(&networksDisabled, "disable-networks", false, "Disables network selection. If enabled, the client will not allow listing, selecting, or deselecting networks. To persist, use: netbird service install --disable-networks")
|
||||
serviceCmd.PersistentFlags().StringVar(&jsonSocket, "json-socket", defaultJSONSocket, "HTTP/JSON API socket address served by grpc-gateway [unix|tcp]://[path|host:port]. To persist, use: netbird service install --json-socket")
|
||||
serviceCmd.PersistentFlags().BoolVar(&jsonSocketDisabled, "disable-json-socket", false, "Disables the HTTP/JSON API socket. To persist, use: netbird service install --disable-json-socket")
|
||||
|
||||
rootCmd.PersistentFlags().StringVarP(&serviceName, "service", "s", defaultServiceName, "Netbird system service name")
|
||||
serviceEnvDesc := `Sets extra environment variables for the service. ` +
|
||||
|
||||
@@ -5,6 +5,9 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/kardianos/service"
|
||||
@@ -29,35 +32,31 @@ func (p *program) Start(svc service.Service) error {
|
||||
// in any case, even if configuration does not exists we run daemon to serve CLI gRPC API.
|
||||
p.serv = grpc.NewServer()
|
||||
|
||||
daemonListener, err := listenOnAddress(daemonAddr)
|
||||
split := strings.Split(daemonAddr, "://")
|
||||
switch split[0] {
|
||||
case "unix":
|
||||
// cleanup failed close
|
||||
stat, err := os.Stat(split[1])
|
||||
if err == nil && !stat.IsDir() {
|
||||
if err := os.Remove(split[1]); err != nil {
|
||||
log.Debugf("remove socket file: %v", err)
|
||||
}
|
||||
}
|
||||
case "tcp":
|
||||
default:
|
||||
return fmt.Errorf("unsupported daemon address protocol: %v", split[0])
|
||||
}
|
||||
|
||||
listen, err := net.Listen(split[0], split[1])
|
||||
if err != nil {
|
||||
return fmt.Errorf("listen daemon interface: %w", err)
|
||||
}
|
||||
|
||||
var jsonListener *socketListener
|
||||
if !jsonSocketDisabled {
|
||||
jsonListener, err = listenOnAddress(jsonSocket)
|
||||
if err != nil {
|
||||
_ = daemonListener.Close()
|
||||
return fmt.Errorf("listen daemon JSON interface: %w", err)
|
||||
}
|
||||
} else {
|
||||
removeStaleUnixSocketForAddress(jsonSocket)
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer daemonListener.Close()
|
||||
if jsonListener != nil {
|
||||
defer jsonListener.Close()
|
||||
}
|
||||
defer listen.Close()
|
||||
|
||||
if err := daemonListener.chmodUnixSocket("daemon"); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
if jsonListener != nil {
|
||||
if err := jsonListener.chmodUnixSocket("daemon JSON"); err != nil {
|
||||
log.Error(err)
|
||||
if split[0] == "unix" {
|
||||
if err := os.Chmod(split[1], 0666); err != nil {
|
||||
log.Errorf("failed setting daemon permissions: %v", split[1])
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -72,16 +71,8 @@ func (p *program) Start(svc service.Service) error {
|
||||
p.serverInstance = serverInstance
|
||||
p.serverInstanceMu.Unlock()
|
||||
|
||||
if jsonListener != nil {
|
||||
if err := p.startJSONGateway(jsonListener, daemonAddr); err != nil {
|
||||
log.Fatalf("failed to start daemon JSON server: %v", err)
|
||||
}
|
||||
} else {
|
||||
log.Debug("daemon JSON socket disabled")
|
||||
}
|
||||
|
||||
log.Printf("started daemon server: %v", daemonListener.address)
|
||||
if err := p.serv.Serve(daemonListener.Listener); err != nil {
|
||||
log.Printf("started daemon server: %v", split[1])
|
||||
if err := p.serv.Serve(listen); err != nil {
|
||||
log.Errorf("failed to serve daemon requests: %v", err)
|
||||
}
|
||||
}()
|
||||
@@ -101,20 +92,6 @@ func (p *program) Stop(srv service.Service) error {
|
||||
|
||||
p.cancel()
|
||||
|
||||
p.jsonServMu.Lock()
|
||||
jsonServ := p.jsonServ
|
||||
p.jsonServMu.Unlock()
|
||||
if jsonServ != nil {
|
||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
if err := jsonServ.Shutdown(shutdownCtx); err != nil {
|
||||
log.Errorf("failed to stop daemon JSON server gracefully: %v", err)
|
||||
if err := jsonServ.Close(); err != nil {
|
||||
log.Errorf("failed to close daemon JSON server: %v", err)
|
||||
}
|
||||
}
|
||||
shutdownCancel()
|
||||
}
|
||||
|
||||
if p.serv != nil {
|
||||
p.serv.Stop()
|
||||
}
|
||||
|
||||
@@ -67,11 +67,6 @@ func buildServiceArguments() []string {
|
||||
args = append(args, "--disable-networks")
|
||||
}
|
||||
|
||||
args = append(args, "--json-socket", jsonSocket)
|
||||
if jsonSocketDisabled {
|
||||
args = append(args, "--disable-json-socket")
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
|
||||
@@ -1,52 +0,0 @@
|
||||
//go:build !ios && !android
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
||||
"github.com/netbirdio/netbird/client/proto"
|
||||
)
|
||||
|
||||
func grpcGatewayEndpoint(addr string) string {
|
||||
return strings.TrimPrefix(addr, "tcp://")
|
||||
}
|
||||
|
||||
func (p *program) startJSONGateway(jsonListener *socketListener, daemonEndpoint string) error {
|
||||
mux := runtime.NewServeMux()
|
||||
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
|
||||
if err := proto.RegisterDaemonServiceHandlerFromEndpoint(p.ctx, mux, grpcGatewayEndpoint(daemonEndpoint), opts); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
jsonServer := &http.Server{
|
||||
Handler: mux,
|
||||
ReadHeaderTimeout: 5 * time.Second,
|
||||
BaseContext: func(net.Listener) context.Context {
|
||||
return p.ctx
|
||||
},
|
||||
}
|
||||
|
||||
p.jsonServMu.Lock()
|
||||
p.jsonServ = jsonServer
|
||||
p.jsonServMu.Unlock()
|
||||
|
||||
go func() {
|
||||
log.Printf("started daemon JSON server: %v", jsonListener.address)
|
||||
if err := jsonServer.Serve(jsonListener.Listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
log.Errorf("failed to serve daemon JSON requests: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -23,7 +23,6 @@ const serviceParamsFile = "service.json"
|
||||
type serviceParams struct {
|
||||
LogLevel string `json:"log_level"`
|
||||
DaemonAddr string `json:"daemon_addr"`
|
||||
JSONSocket string `json:"json_socket"`
|
||||
ManagementURL string `json:"management_url,omitempty"`
|
||||
ConfigPath string `json:"config_path,omitempty"`
|
||||
LogFiles []string `json:"log_files,omitempty"`
|
||||
@@ -31,7 +30,6 @@ type serviceParams struct {
|
||||
DisableUpdateSettings bool `json:"disable_update_settings,omitempty"`
|
||||
EnableCapture bool `json:"enable_capture,omitempty"`
|
||||
DisableNetworks bool `json:"disable_networks,omitempty"`
|
||||
DisableJSONSocket bool `json:"disable_json_socket,omitempty"`
|
||||
ServiceEnvVars map[string]string `json:"service_env_vars,omitempty"`
|
||||
}
|
||||
|
||||
@@ -77,7 +75,6 @@ func currentServiceParams() *serviceParams {
|
||||
params := &serviceParams{
|
||||
LogLevel: logLevel,
|
||||
DaemonAddr: daemonAddr,
|
||||
JSONSocket: jsonSocket,
|
||||
ManagementURL: managementURL,
|
||||
ConfigPath: configPath,
|
||||
LogFiles: logFiles,
|
||||
@@ -85,7 +82,6 @@ func currentServiceParams() *serviceParams {
|
||||
DisableUpdateSettings: updateSettingsDisabled,
|
||||
EnableCapture: captureEnabled,
|
||||
DisableNetworks: networksDisabled,
|
||||
DisableJSONSocket: jsonSocketDisabled,
|
||||
}
|
||||
|
||||
if len(serviceEnvVars) > 0 {
|
||||
@@ -117,8 +113,9 @@ func applyServiceParams(cmd *cobra.Command, params *serviceParams) {
|
||||
return
|
||||
}
|
||||
|
||||
// For fields with non-empty defaults, keep the != "" guard so that an older
|
||||
// service.json missing the field doesn't clobber the default with an empty string.
|
||||
// For fields with non-empty defaults (log-level, daemon-addr), keep the
|
||||
// != "" guard so that an older service.json missing the field doesn't
|
||||
// clobber the default with an empty string.
|
||||
if !rootCmd.PersistentFlags().Changed("log-level") && params.LogLevel != "" {
|
||||
logLevel = params.LogLevel
|
||||
}
|
||||
@@ -127,20 +124,6 @@ func applyServiceParams(cmd *cobra.Command, params *serviceParams) {
|
||||
daemonAddr = params.DaemonAddr
|
||||
}
|
||||
|
||||
jsonSocketChanged := serviceCmd.PersistentFlags().Changed("json-socket")
|
||||
if !jsonSocketChanged && params.JSONSocket != "" {
|
||||
jsonSocket = params.JSONSocket
|
||||
}
|
||||
|
||||
if !serviceCmd.PersistentFlags().Changed("disable-json-socket") {
|
||||
jsonSocketDisabled = params.DisableJSONSocket
|
||||
// Passing --json-socket should re-enable the JSON gateway unless
|
||||
// --disable-json-socket was explicitly provided too.
|
||||
if jsonSocketChanged {
|
||||
jsonSocketDisabled = false
|
||||
}
|
||||
}
|
||||
|
||||
// For optional fields where empty means "use default", always apply so
|
||||
// that an explicit clear (--management-url "") persists across reinstalls.
|
||||
if !rootCmd.PersistentFlags().Changed("management-url") {
|
||||
|
||||
@@ -530,7 +530,6 @@ func fieldToGlobalVar(field string) string {
|
||||
m := map[string]string{
|
||||
"LogLevel": "logLevel",
|
||||
"DaemonAddr": "daemonAddr",
|
||||
"JSONSocket": "jsonSocket",
|
||||
"ManagementURL": "managementURL",
|
||||
"ConfigPath": "configPath",
|
||||
"LogFiles": "logFiles",
|
||||
@@ -538,7 +537,6 @@ func fieldToGlobalVar(field string) string {
|
||||
"DisableUpdateSettings": "updateSettingsDisabled",
|
||||
"EnableCapture": "captureEnabled",
|
||||
"DisableNetworks": "networksDisabled",
|
||||
"DisableJSONSocket": "jsonSocketDisabled",
|
||||
"ServiceEnvVars": "serviceEnvVars",
|
||||
}
|
||||
if v, ok := m[field]; ok {
|
||||
|
||||
@@ -1,83 +0,0 @@
|
||||
//go:build !ios && !android
|
||||
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type socketListener struct {
|
||||
net.Listener
|
||||
network string
|
||||
address string
|
||||
}
|
||||
|
||||
func listenOnAddress(addr string) (*socketListener, error) {
|
||||
network, address, err := parseListenAddress(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if network == "unix" {
|
||||
removeStaleUnixSocket(address)
|
||||
}
|
||||
|
||||
listener, err := net.Listen(network, address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &socketListener{Listener: listener, network: network, address: address}, nil
|
||||
}
|
||||
|
||||
func parseListenAddress(addr string) (string, string, error) {
|
||||
network, address, ok := strings.Cut(addr, "://")
|
||||
if !ok || network == "" || address == "" {
|
||||
return "", "", fmt.Errorf("address must be in [unix|tcp]://[path|host:port] format: %q", addr)
|
||||
}
|
||||
|
||||
switch network {
|
||||
case "unix", "tcp":
|
||||
return network, address, nil
|
||||
default:
|
||||
return "", "", fmt.Errorf("unsupported daemon address protocol: %v", network)
|
||||
}
|
||||
}
|
||||
|
||||
func removeStaleUnixSocket(path string) {
|
||||
stat, err := os.Stat(path)
|
||||
if err == nil && !stat.IsDir() {
|
||||
if err := os.Remove(path); err != nil {
|
||||
log.Debugf("remove socket file: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
log.Debugf("stat socket file: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func removeStaleUnixSocketForAddress(addr string) {
|
||||
network, address, err := parseListenAddress(addr)
|
||||
if err != nil || network != "unix" {
|
||||
return
|
||||
}
|
||||
removeStaleUnixSocket(address)
|
||||
}
|
||||
|
||||
func (l *socketListener) chmodUnixSocket(description string) error {
|
||||
if l == nil || l.network != "unix" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := os.Chmod(l.address, 0666); err != nil {
|
||||
return fmt.Errorf("failed setting %s permissions for %s: %w", description, l.address, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -51,20 +51,13 @@ type cachedRecord struct {
|
||||
}
|
||||
|
||||
// Resolver caches critical NetBird infrastructure domains.
|
||||
// records, refreshing, failedResolves, mgmtDomain and serverDomains are all
|
||||
// guarded by mutex.
|
||||
// records, refreshing, mgmtDomain and serverDomains are all guarded by mutex.
|
||||
type Resolver struct {
|
||||
records map[dns.Question]*cachedRecord
|
||||
mgmtDomain *domain.Domain
|
||||
serverDomains *dnsconfig.ServerDomains
|
||||
mutex sync.RWMutex
|
||||
|
||||
// failedResolves records the last failed initial resolve per domain so a
|
||||
// domain that never resolves isn't retried on every server-domains update
|
||||
// until refreshBackoff elapses. Entries are cleared on success and pruned
|
||||
// to the current server-domains set.
|
||||
failedResolves map[domain.Domain]time.Time
|
||||
|
||||
chain ChainResolver
|
||||
chainMaxPriority int
|
||||
refreshGroup singleflight.Group
|
||||
@@ -83,10 +76,9 @@ type Resolver struct {
|
||||
// NewResolver creates a new management domains cache resolver.
|
||||
func NewResolver() *Resolver {
|
||||
return &Resolver{
|
||||
records: make(map[dns.Question]*cachedRecord),
|
||||
refreshing: make(map[dns.Question]*atomic.Bool),
|
||||
failedResolves: make(map[domain.Domain]time.Time),
|
||||
cacheTTL: resolveCacheTTL(),
|
||||
records: make(map[dns.Question]*cachedRecord),
|
||||
refreshing: make(map[dns.Question]*atomic.Bool),
|
||||
cacheTTL: resolveCacheTTL(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,9 +173,7 @@ func (m *Resolver) continueToNext(w dns.ResponseWriter, r *dns.Msg) {
|
||||
|
||||
// AddDomain resolves a domain and stores its A/AAAA records in the cache.
|
||||
// A family that resolves NODATA (nil err, zero records) evicts any stale
|
||||
// entry for that qtype. When one family hard-errors while the other succeeds,
|
||||
// the resolved family is still cached but AddDomain returns an error so the
|
||||
// caller retries the incomplete resolve rather than treating it as complete.
|
||||
// entry for that qtype.
|
||||
func (m *Resolver) AddDomain(ctx context.Context, d domain.Domain) error {
|
||||
dnsName := strings.ToLower(dns.Fqdn(d.PunycodeString()))
|
||||
|
||||
@@ -213,10 +203,6 @@ func (m *Resolver) AddDomain(ctx context.Context, d domain.Domain) error {
|
||||
log.Debugf("added/updated domain=%s with %d A records and %d AAAA records",
|
||||
d.SafeString(), len(aRecords), len(aaaaRecords))
|
||||
|
||||
if errA != nil || errAAAA != nil {
|
||||
return fmt.Errorf("resolve %s: incomplete, a family failed: %w", d.SafeString(), errors.Join(errA, errAAAA))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -476,7 +462,6 @@ func (m *Resolver) RemoveDomain(d domain.Domain) error {
|
||||
delete(m.records, qAAAA)
|
||||
delete(m.refreshing, qA)
|
||||
delete(m.refreshing, qAAAA)
|
||||
delete(m.failedResolves, d)
|
||||
|
||||
log.Debugf("removed domain=%s from cache", d.SafeString())
|
||||
return nil
|
||||
@@ -520,7 +505,6 @@ func (m *Resolver) UpdateFromServerDomains(ctx context.Context, serverDomains dn
|
||||
allDomains := m.extractDomainsFromServerDomains(updatedServerDomains)
|
||||
currentDomains := m.GetCachedDomains()
|
||||
removedDomains = m.removeStaleDomains(currentDomains, allDomains)
|
||||
m.pruneFailedResolves(allDomains)
|
||||
}
|
||||
|
||||
m.addNewDomains(ctx, newDomains)
|
||||
@@ -593,85 +577,13 @@ func (m *Resolver) isManagementDomain(domain domain.Domain) bool {
|
||||
return m.mgmtDomain != nil && domain == *m.mgmtDomain
|
||||
}
|
||||
|
||||
// addNewDomains resolves and caches domains that are not yet in the cache,
|
||||
// running the lookups concurrently. Domains already cached are skipped and left
|
||||
// to the stale-while-revalidate refresh path, so a sync never re-resolves them
|
||||
// synchronously: once NetBird owns the OS resolver the resolve runs through the
|
||||
// handler chain and would otherwise dial the managed upstreams under the engine
|
||||
// sync lock on every update.
|
||||
// addNewDomains resolves and caches all domains from the update
|
||||
func (m *Resolver) addNewDomains(ctx context.Context, newDomains domain.List) {
|
||||
var wg sync.WaitGroup
|
||||
seen := make(map[domain.Domain]struct{}, len(newDomains))
|
||||
for _, newDomain := range newDomains {
|
||||
if _, dup := seen[newDomain]; dup {
|
||||
continue
|
||||
}
|
||||
seen[newDomain] = struct{}{}
|
||||
|
||||
if !m.needsResolve(newDomain) {
|
||||
continue
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(d domain.Domain) {
|
||||
defer wg.Done()
|
||||
if err := m.AddDomain(ctx, d); err != nil {
|
||||
m.markResolveFailed(d)
|
||||
log.Warnf("failed to add/update domain=%s: %v", d.SafeString(), err)
|
||||
return
|
||||
}
|
||||
m.clearResolveFailed(d)
|
||||
log.Debugf("added/updated management cache domain=%s", d.SafeString())
|
||||
}(newDomain)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// needsResolve reports whether d should be resolved now. A recent failed or
|
||||
// incomplete resolve gates retries on the backoff even when one family is
|
||||
// already cached, so a transiently-failed family is retried instead of being
|
||||
// treated as fully resolved. Otherwise a domain with any cached record is left
|
||||
// to the stale-while-revalidate refresh path.
|
||||
func (m *Resolver) needsResolve(d domain.Domain) bool {
|
||||
dnsName := strings.ToLower(dns.Fqdn(d.PunycodeString()))
|
||||
|
||||
m.mutex.RLock()
|
||||
defer m.mutex.RUnlock()
|
||||
|
||||
if failedAt, ok := m.failedResolves[d]; ok {
|
||||
return time.Since(failedAt) >= refreshBackoff
|
||||
}
|
||||
|
||||
for _, qtype := range []uint16{dns.TypeA, dns.TypeAAAA} {
|
||||
q := dns.Question{Name: dnsName, Qtype: qtype, Qclass: dns.ClassINET}
|
||||
if _, ok := m.records[q]; ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *Resolver) markResolveFailed(d domain.Domain) {
|
||||
m.mutex.Lock()
|
||||
m.failedResolves[d] = time.Now()
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (m *Resolver) clearResolveFailed(d domain.Domain) {
|
||||
m.mutex.Lock()
|
||||
delete(m.failedResolves, d)
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
|
||||
// pruneFailedResolves drops failure markers for domains no longer present in
|
||||
// the server-domains set, keeping the map bounded to the current set (a
|
||||
// failed-only domain has no cached record, so RemoveDomain never sees it).
|
||||
func (m *Resolver) pruneFailedResolves(domains domain.List) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
for d := range m.failedResolves {
|
||||
if !slices.Contains(domains, d) {
|
||||
delete(m.failedResolves, d)
|
||||
if err := m.AddDomain(ctx, newDomain); err != nil {
|
||||
log.Warnf("failed to add/update domain=%s: %v", newDomain.SafeString(), err)
|
||||
} else {
|
||||
log.Debugf("added/updated management cache domain=%s", newDomain.SafeString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ type fakeChain struct {
|
||||
mu sync.Mutex
|
||||
calls map[string]int
|
||||
answers map[string][]dns.RR
|
||||
qErr map[string]error
|
||||
err error
|
||||
hasRoot bool
|
||||
onLookup func()
|
||||
@@ -31,7 +30,6 @@ func newFakeChain() *fakeChain {
|
||||
return &fakeChain{
|
||||
calls: map[string]int{},
|
||||
answers: map[string][]dns.RR{},
|
||||
qErr: map[string]error{},
|
||||
hasRoot: true,
|
||||
}
|
||||
}
|
||||
@@ -49,9 +47,6 @@ func (f *fakeChain) ResolveInternal(ctx context.Context, msg *dns.Msg, maxPriori
|
||||
f.calls[key]++
|
||||
answers := f.answers[key]
|
||||
err := f.err
|
||||
if err == nil {
|
||||
err = f.qErr[key]
|
||||
}
|
||||
onLookup := f.onLookup
|
||||
f.mu.Unlock()
|
||||
|
||||
@@ -80,12 +75,6 @@ func (f *fakeChain) setAnswer(name string, qtype uint16, ip string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakeChain) setErr(name string, qtype uint16, err error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.qErr[name+"|"+dns.TypeToString[qtype]] = err
|
||||
}
|
||||
|
||||
func (f *fakeChain) callCount(name string, qtype uint16) int {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
@@ -1,183 +0,0 @@
|
||||
package mgmt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/miekg/dns"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
|
||||
"github.com/netbirdio/netbird/shared/management/domain"
|
||||
)
|
||||
|
||||
// A domain already in the cache must not be re-resolved on a subsequent server
|
||||
// domains update; it is left to the stale-while-revalidate refresh path.
|
||||
func TestResolver_UpdateFromServerDomains_SkipsCached(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("signal.example.com.", dns.TypeA, "10.0.0.2")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
sd := dnsconfig.ServerDomains{Signal: domain.Domain("signal.example.com")}
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"first update must resolve the domain")
|
||||
|
||||
_, err = r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"cached domain must not be re-resolved on a subsequent update")
|
||||
}
|
||||
|
||||
// New domains in a single update must resolve concurrently rather than serially.
|
||||
func TestResolver_AddNewDomains_ResolvesConcurrently(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
|
||||
var inflight, maxInflight atomic.Int32
|
||||
chain.onLookup = func() {
|
||||
n := inflight.Add(1)
|
||||
for {
|
||||
old := maxInflight.Load()
|
||||
if n <= old || maxInflight.CompareAndSwap(old, n) {
|
||||
break
|
||||
}
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
inflight.Add(-1)
|
||||
}
|
||||
|
||||
relays := []domain.Domain{"a.example.com", "b.example.com", "c.example.com", "d.example.com"}
|
||||
for _, d := range relays {
|
||||
chain.setAnswer(dns.Fqdn(string(d)), dns.TypeA, "10.0.0.2")
|
||||
}
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
start := time.Now()
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: relays})
|
||||
require.NoError(t, err)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
assert.GreaterOrEqual(t, int(maxInflight.Load()), 2, "domains must resolve concurrently")
|
||||
// Serial resolution of 4 domains would take at least 4*50ms; concurrent is far less.
|
||||
assert.Less(t, elapsed, 300*time.Millisecond, "resolution should not be serial")
|
||||
}
|
||||
|
||||
// A domain that fails to resolve must not be retried on every update; the
|
||||
// failure backoff suppresses re-resolution until it expires.
|
||||
func TestResolver_UpdateFromServerDomains_BacksOffFailures(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.err = errors.New("resolve boom")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
sd := dnsconfig.ServerDomains{Signal: domain.Domain("signal.example.com")}
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"first update must attempt the resolve")
|
||||
|
||||
_, err = r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"failed resolve must back off and not retry on the next update")
|
||||
}
|
||||
|
||||
// A domain listed under more than one server-domain type (e.g. STUN and TURN on
|
||||
// the same host) must be resolved once per update, not once per occurrence.
|
||||
func TestResolver_AddNewDomains_DedupesDuplicateDomains(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("dup.example.com.", dns.TypeA, "10.0.0.9")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
sd := dnsconfig.ServerDomains{
|
||||
Stuns: []domain.Domain{"dup.example.com"},
|
||||
Turns: []domain.Domain{"dup.example.com"},
|
||||
}
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, chain.callCount("dup.example.com.", dns.TypeA),
|
||||
"a domain appearing under multiple server-domain types must resolve once")
|
||||
}
|
||||
|
||||
// A failure marker must be dropped once its domain leaves the server-domains set
|
||||
// so the map stays bounded to the current set.
|
||||
func TestResolver_UpdateFromServerDomains_PrunesFailedResolves(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.err = errors.New("resolve boom")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Signal: domain.Domain("gone.example.com")})
|
||||
require.NoError(t, err)
|
||||
r.mutex.RLock()
|
||||
_, marked := r.failedResolves[domain.Domain("gone.example.com")]
|
||||
r.mutex.RUnlock()
|
||||
require.True(t, marked, "failed resolve must be recorded")
|
||||
|
||||
_, err = r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Signal: domain.Domain("other.example.com")})
|
||||
require.NoError(t, err)
|
||||
r.mutex.RLock()
|
||||
_, stillMarked := r.failedResolves[domain.Domain("gone.example.com")]
|
||||
r.mutex.RUnlock()
|
||||
assert.False(t, stillMarked, "failure marker for a domain no longer in the set must be pruned")
|
||||
}
|
||||
|
||||
// When one family hard-errors while the other resolves, the domain is cached
|
||||
// for the working family but recorded as incomplete so the failed family is
|
||||
// retried under backoff instead of being treated as fully resolved forever.
|
||||
func TestResolver_AddNewDomains_RetriesPartialFamilyFailure(t *testing.T) {
|
||||
d := domain.Domain("relay.example.com")
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("relay.example.com.", dns.TypeA, "10.0.0.2")
|
||||
chain.setErr("relay.example.com.", dns.TypeAAAA, errors.New("servfail"))
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: []domain.Domain{d}})
|
||||
require.NoError(t, err)
|
||||
|
||||
r.mutex.RLock()
|
||||
_, aCached := r.records[dns.Question{Name: "relay.example.com.", Qtype: dns.TypeA, Qclass: dns.ClassINET}]
|
||||
_, marked := r.failedResolves[d]
|
||||
r.mutex.RUnlock()
|
||||
require.True(t, aCached, "the working family must still be cached")
|
||||
require.True(t, marked, "a partial failure must be recorded so the failed family is retried")
|
||||
|
||||
assert.False(t, r.needsResolve(d), "within the backoff window the domain is not retried")
|
||||
|
||||
r.mutex.Lock()
|
||||
r.failedResolves[d] = time.Now().Add(-2 * refreshBackoff)
|
||||
r.mutex.Unlock()
|
||||
assert.True(t, r.needsResolve(d), "after the backoff elapses the domain is retried to pick up the missing family")
|
||||
}
|
||||
|
||||
// A family that returns NODATA (legitimately absent, e.g. an IPv4-only host) is
|
||||
// not a failure: the domain must not be marked for retry, otherwise it would be
|
||||
// re-resolved on every sync.
|
||||
func TestResolver_AddNewDomains_NodataIsNotFailure(t *testing.T) {
|
||||
d := domain.Domain("v4only.example.com")
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("v4only.example.com.", dns.TypeA, "10.0.0.2")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: []domain.Domain{d}})
|
||||
require.NoError(t, err)
|
||||
|
||||
r.mutex.RLock()
|
||||
_, marked := r.failedResolves[d]
|
||||
r.mutex.RUnlock()
|
||||
assert.False(t, marked, "a NODATA family must not be recorded as a failure")
|
||||
assert.False(t, r.needsResolve(d), "an IPv4-only host must not be re-resolved on later syncs")
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -12,11 +12,5 @@ script_path=$(dirname "$(realpath "$0")")
|
||||
cd "$script_path"
|
||||
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.36.6
|
||||
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.6.1
|
||||
go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@v2.26.3
|
||||
protoc -I ./ ./daemon.proto \
|
||||
--go_out=../ \
|
||||
--go-grpc_out=../ \
|
||||
--grpc-gateway_out=../ \
|
||||
--grpc-gateway_opt=generate_unbound_methods=true \
|
||||
--experimental_allow_proto3_optional
|
||||
protoc -I ./ ./daemon.proto --go_out=../ --go-grpc_out=../ --experimental_allow_proto3_optional
|
||||
cd "$old_pwd"
|
||||
|
||||
2
go.mod
2
go.mod
@@ -66,7 +66,6 @@ require (
|
||||
github.com/google/nftables v0.3.0
|
||||
github.com/gopacket/gopacket v1.4.0
|
||||
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.2-0.20240212192251-757544f21357
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3
|
||||
github.com/hashicorp/go-multierror v1.1.1
|
||||
github.com/hashicorp/go-secure-stdlib/base62 v0.1.2
|
||||
github.com/hashicorp/go-version v1.7.0
|
||||
@@ -331,7 +330,6 @@ require (
|
||||
golang.org/x/text v0.36.0 // indirect
|
||||
golang.org/x/tools v0.43.0 // indirect
|
||||
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260319201613-d00831a3d3e7 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
|
||||
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
||||
|
||||
@@ -41,7 +41,7 @@ func TestAffectedPeers_DependencyCoverageMatrix(t *testing.T) {
|
||||
_, err := s.manager.SavePolicy(ctx, s.accountID, userID, peerToResourcePolicyByGroup(s.sourceGroupID, s.resourceGroupID), true)
|
||||
require.NoError(t, err)
|
||||
return affectedpeers.Change{ChangedPeerIDs: []string{s.routerPeerID}},
|
||||
[]string{s.sourcePeerID}, []string{s.unrelatedPeerID}
|
||||
[]string{s.sourcePeerID, s.routerPeerID}, []string{s.unrelatedPeerID}
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -106,12 +106,8 @@ func TestAffectedPeers_DependencyCoverageMatrix(t *testing.T) {
|
||||
change, mustContain, mustExclude := r.build(t, s, ctx)
|
||||
affected := resolveAffected(t, s.manager.Store, s.accountID, change)
|
||||
|
||||
for _, id := range mustContain {
|
||||
assert.Contains(t, affected, id, "expected peer to be affected")
|
||||
}
|
||||
for _, id := range mustExclude {
|
||||
assert.NotContains(t, affected, id, "peer must not be affected")
|
||||
}
|
||||
assert.ElementsMatch(t, affected, mustContain, "expected peer to be affected")
|
||||
assert.NotContains(t, affected, mustExclude, "peer must not be affected")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,33 +96,54 @@ func affectedGroupID(i int) string { return fmt.Sprintf("affected-grp-%d", i)
|
||||
func affectedGroupName(i int) string { return fmt.Sprintf("AffectedGroup%d", i) }
|
||||
|
||||
func TestCollectGroupChange_PolicyLinked(t *testing.T) {
|
||||
manager, s, accountID, _, groupIDs := setupAffectedPeersTest(t)
|
||||
manager, s, accountID, peerIDs, groupIDs := setupAffectedPeersTest(t)
|
||||
ctx := context.Background()
|
||||
|
||||
_, err := manager.SavePolicy(ctx, accountID, userID, &types.Policy{
|
||||
Enabled: true,
|
||||
Rules: []*types.PolicyRule{
|
||||
{
|
||||
Enabled: true,
|
||||
Sources: []string{groupIDs[0]},
|
||||
Destinations: []string{groupIDs[1]},
|
||||
Bidirectional: true,
|
||||
Action: types.PolicyTrafficActionAccept,
|
||||
Enabled: true,
|
||||
Sources: []string{groupIDs[0]},
|
||||
Destinations: []string{groupIDs[1]},
|
||||
SourceResource: types.Resource{ID: peerIDs[0], Type: types.ResourceTypePeer},
|
||||
DestinationResource: types.Resource{ID: peerIDs[1], Type: types.ResourceTypePeer},
|
||||
Bidirectional: true,
|
||||
Action: types.PolicyTrafficActionAccept,
|
||||
},
|
||||
{
|
||||
Enabled: true,
|
||||
Sources: []string{groupIDs[0]},
|
||||
Destinations: []string{groupIDs[1]},
|
||||
SourceResource: types.Resource{ID: peerIDs[2], Type: types.ResourceTypeHost},
|
||||
DestinationResource: types.Resource{ID: peerIDs[3], Type: types.ResourceTypeHost},
|
||||
Bidirectional: true,
|
||||
Action: types.PolicyTrafficActionAccept,
|
||||
},
|
||||
{
|
||||
Enabled: true,
|
||||
Sources: []string{groupIDs[0]},
|
||||
Destinations: []string{groupIDs[1]},
|
||||
SourceResource: types.Resource{ID: "", Type: types.ResourceTypePeer},
|
||||
DestinationResource: types.Resource{ID: "", Type: types.ResourceTypePeer},
|
||||
Bidirectional: true,
|
||||
Action: types.PolicyTrafficActionAccept,
|
||||
},
|
||||
},
|
||||
}, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
groups, _ := collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[0]})
|
||||
assert.Contains(t, groups, groupIDs[0])
|
||||
assert.Contains(t, groups, groupIDs[1])
|
||||
groups, directPeers := collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[0]})
|
||||
assert.ElementsMatch(t, groups, []string{groupIDs[0], groupIDs[1]})
|
||||
assert.ElementsMatch(t, directPeers, []string{peerIDs[1]})
|
||||
|
||||
groups, _ = collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[1]})
|
||||
assert.Contains(t, groups, groupIDs[0])
|
||||
assert.Contains(t, groups, groupIDs[1])
|
||||
groups, directPeers = collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[1]})
|
||||
assert.ElementsMatch(t, groups, []string{groupIDs[0], groupIDs[1]})
|
||||
assert.ElementsMatch(t, directPeers, []string{peerIDs[0]})
|
||||
|
||||
groups, _ = collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[2]})
|
||||
groups, directPeers = collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[2]})
|
||||
assert.Empty(t, groups)
|
||||
assert.Empty(t, directPeers)
|
||||
}
|
||||
|
||||
func TestCollectGroupChange_PolicyWithDirectPeerResource(t *testing.T) {
|
||||
@@ -133,20 +154,44 @@ func TestCollectGroupChange_PolicyWithDirectPeerResource(t *testing.T) {
|
||||
Enabled: true,
|
||||
Rules: []*types.PolicyRule{
|
||||
{
|
||||
Enabled: true,
|
||||
Sources: []string{groupIDs[0]},
|
||||
SourceResource: types.Resource{ID: peerIDs[3], Type: types.ResourceTypePeer},
|
||||
Destinations: []string{groupIDs[1]},
|
||||
Action: types.PolicyTrafficActionAccept,
|
||||
Enabled: true,
|
||||
Sources: []string{groupIDs[0]},
|
||||
SourceResource: types.Resource{ID: peerIDs[3], Type: types.ResourceTypePeer},
|
||||
DestinationResource: types.Resource{ID: peerIDs[4], Type: types.ResourceTypePeer},
|
||||
Destinations: []string{groupIDs[1]},
|
||||
Action: types.PolicyTrafficActionAccept,
|
||||
},
|
||||
{
|
||||
Enabled: true,
|
||||
Sources: []string{groupIDs[0]},
|
||||
SourceResource: types.Resource{ID: peerIDs[1], Type: types.ResourceTypeHost},
|
||||
DestinationResource: types.Resource{ID: peerIDs[2], Type: types.ResourceTypeHost},
|
||||
Destinations: []string{groupIDs[1]},
|
||||
Action: types.PolicyTrafficActionAccept,
|
||||
},
|
||||
{
|
||||
Enabled: true,
|
||||
Sources: []string{groupIDs[0]},
|
||||
SourceResource: types.Resource{ID: "", Type: types.ResourceTypePeer},
|
||||
DestinationResource: types.Resource{ID: "", Type: types.ResourceTypePeer},
|
||||
Destinations: []string{groupIDs[1]},
|
||||
Action: types.PolicyTrafficActionAccept,
|
||||
},
|
||||
},
|
||||
}, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
groups, directPeers := collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[0]})
|
||||
assert.Contains(t, groups, groupIDs[0])
|
||||
assert.Contains(t, groups, groupIDs[1])
|
||||
assert.Contains(t, directPeers, peerIDs[3])
|
||||
assert.ElementsMatch(t, groups, []string{groupIDs[0], groupIDs[1]})
|
||||
assert.ElementsMatch(t, directPeers, []string{peerIDs[4]})
|
||||
|
||||
groups, directPeers = collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[1]})
|
||||
assert.ElementsMatch(t, groups, []string{groupIDs[0], groupIDs[1]})
|
||||
assert.ElementsMatch(t, directPeers, []string{peerIDs[3]})
|
||||
|
||||
groups, directPeers = collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[2]})
|
||||
assert.Empty(t, groups)
|
||||
assert.Empty(t, directPeers)
|
||||
}
|
||||
|
||||
func TestCollectGroupChange_PolicyWithNonPeerResource_NoDirectPeers(t *testing.T) {
|
||||
@@ -168,8 +213,7 @@ func TestCollectGroupChange_PolicyWithNonPeerResource_NoDirectPeers(t *testing.T
|
||||
require.NoError(t, err)
|
||||
|
||||
groups, directPeers := collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[0]})
|
||||
assert.Contains(t, groups, groupIDs[0])
|
||||
assert.Contains(t, groups, groupIDs[1])
|
||||
assert.ElementsMatch(t, groups, []string{groupIDs[0], groupIDs[1]})
|
||||
assert.Empty(t, directPeers, "non-peer resources should not produce direct peer IDs")
|
||||
}
|
||||
|
||||
@@ -373,17 +417,11 @@ func TestCollectGroupChange_MultipleEntities(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
groups, directPeers := collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[0]})
|
||||
assert.Contains(t, groups, groupIDs[0])
|
||||
assert.Contains(t, groups, groupIDs[1])
|
||||
assert.NotContains(t, groups, groupIDs[2])
|
||||
assert.NotContains(t, groups, groupIDs[3])
|
||||
assert.ElementsMatch(t, groups, []string{groupIDs[0], groupIDs[1]})
|
||||
assert.Empty(t, directPeers)
|
||||
|
||||
groups, directPeers = collectGroupChangeAffectedGroups(ctx, s, accountID, []string{groupIDs[3]})
|
||||
assert.Contains(t, groups, groupIDs[2])
|
||||
assert.Contains(t, groups, groupIDs[3])
|
||||
assert.NotContains(t, groups, groupIDs[0])
|
||||
assert.NotContains(t, groups, groupIDs[1])
|
||||
assert.ElementsMatch(t, groups, []string{groupIDs[2], groupIDs[3]})
|
||||
assert.Empty(t, directPeers)
|
||||
}
|
||||
|
||||
@@ -474,7 +512,7 @@ func TestResolveAffectedPeers_PolicyThreeGroups(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
result := manager.resolveAffectedPeersForPeerChanges(ctx, s, accountID, []string{peerIDs[0]})
|
||||
assert.ElementsMatch(t, []string{peerIDs[0], peerIDs[1], peerIDs[2]}, result)
|
||||
assert.ElementsMatch(t, []string{peerIDs[0], peerIDs[2]}, result)
|
||||
}
|
||||
|
||||
func TestResolveAffectedPeers_RoutePeerGroups(t *testing.T) {
|
||||
@@ -697,7 +735,7 @@ func TestResolveAffectedPeers_MultipleChangedPeers(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
result := manager.resolveAffectedPeersForPeerChanges(ctx, s, accountID, []string{peerIDs[0], peerIDs[2]})
|
||||
assert.ElementsMatch(t, []string{peerIDs[0], peerIDs[1], peerIDs[2], peerIDs[3]}, result)
|
||||
assert.ElementsMatch(t, []string{peerIDs[0], peerIDs[2], peerIDs[1], peerIDs[3]}, result)
|
||||
}
|
||||
|
||||
func TestResolveAffectedPeers_SharedGroupAcrossPolicyAndRoute(t *testing.T) {
|
||||
|
||||
@@ -11,6 +11,8 @@ package affectedpeers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"maps"
|
||||
"slices"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
@@ -447,15 +449,24 @@ func (r *resolver) collectFromPostureChecks(postureCheckIDs []string) {
|
||||
|
||||
func (r *resolver) collectFromPolicies() {
|
||||
for _, policy := range r.policies() {
|
||||
matchedByGroup := policyReferencesGroups(policy, r.changedGroupSet)
|
||||
matchedByPeer := len(r.changedPeerSet) > 0 && policyReferencesDirectPeers(policy, r.changedPeerSet)
|
||||
if !matchedByGroup && !matchedByPeer {
|
||||
// changed peer IDs have been mapped to changedGroupSet on resolver creation (see seedChangedGroupsFromPeers)
|
||||
// there's no change to the groupSet if the same policies have been changed directly
|
||||
peerIdsViaGroups, groupIdsViaGroups := getGroupsAndPeersFromPolicyViaGroups(policy, r.changedGroupSet)
|
||||
addAll(r.groupSet, groupIdsViaGroups)
|
||||
addAll(r.peerSet, peerIdsViaGroups)
|
||||
|
||||
peerIdsViaPeers, groupIdsViaPeers := getGroupsAndPeersFromPolicyViaPeers(policy, r.changedPeerSet)
|
||||
addAll(r.groupSet, groupIdsViaPeers)
|
||||
addAll(r.peerSet, peerIdsViaPeers)
|
||||
|
||||
hasGroupChanges := len(groupIdsViaPeers) > 0 || len(groupIdsViaGroups) > 0
|
||||
hasPeerChanges := len(peerIdsViaPeers) > 0 || len(peerIdsViaGroups) > 0
|
||||
if !hasGroupChanges && !hasPeerChanges {
|
||||
continue
|
||||
}
|
||||
|
||||
log.WithContext(r.ctx).Tracef("collectFromPolicies: policy %s (%s) matched (byGroup=%t byPeer=%t) -> folding rule groups %v + direct peers",
|
||||
policy.ID, policy.Name, matchedByGroup, matchedByPeer, policy.RuleGroups())
|
||||
addAll(r.groupSet, policy.RuleGroups())
|
||||
collectPolicyDirectPeers(policy, r.peerSet)
|
||||
policy.ID, policy.Name, hasGroupChanges, hasPeerChanges, policy.RuleGroups())
|
||||
r.matchedPolicies = append(r.matchedPolicies, policy)
|
||||
}
|
||||
}
|
||||
@@ -734,22 +745,60 @@ func collectPolicySources(policy *types.Policy, groupSet, peerSet map[string]str
|
||||
}
|
||||
}
|
||||
|
||||
func policyReferencesGroups(policy *types.Policy, groupSet map[string]struct{}) bool {
|
||||
// returns group and peer IDs on the opposite side of the policy:
|
||||
// i.e. if a group is present in the policy rule sources, return destination group IDs and the destinationResource from the rule
|
||||
// and vice-versa
|
||||
func getGroupsAndPeersFromPolicyViaGroups(policy *types.Policy, groupSet map[string]struct{}) ([]string, []string) {
|
||||
var groupIds, peerIds []string
|
||||
if len(groupSet) == 0 {
|
||||
return peerIds, groupIds
|
||||
}
|
||||
for _, rule := range policy.Rules {
|
||||
if anyInSet(rule.Sources, groupSet) || anyInSet(rule.Destinations, groupSet) {
|
||||
return true
|
||||
if matchedIds, ok := allInSet(rule.Sources, groupSet); ok {
|
||||
groupIds = append(groupIds, matchedIds...)
|
||||
groupIds = append(groupIds, rule.Destinations...)
|
||||
if rule.DestinationResource.Type == types.ResourceTypePeer && rule.DestinationResource.ID != "" {
|
||||
peerIds = append(peerIds, rule.DestinationResource.ID)
|
||||
}
|
||||
}
|
||||
if matchedIds, ok := allInSet(rule.Destinations, groupSet); ok {
|
||||
groupIds = append(groupIds, matchedIds...)
|
||||
groupIds = append(groupIds, rule.Sources...)
|
||||
if rule.SourceResource.Type == types.ResourceTypePeer && rule.SourceResource.ID != "" {
|
||||
peerIds = append(peerIds, rule.SourceResource.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
return peerIds, groupIds
|
||||
}
|
||||
|
||||
func policyReferencesDirectPeers(policy *types.Policy, changedSet map[string]struct{}) bool {
|
||||
// returns group and peer IDs on the opposite side of the policy:
|
||||
// i.e. if a peer is present in the policy rule sourceResources, return destination group IDs and the destinationResource from the rule
|
||||
// and vice-versa
|
||||
func getGroupsAndPeersFromPolicyViaPeers(policy *types.Policy, changedSet map[string]struct{}) ([]string, []string) {
|
||||
peerIds := make(map[string]struct{})
|
||||
var groupIds []string
|
||||
if len(changedSet) == 0 {
|
||||
return []string{}, groupIds
|
||||
}
|
||||
for _, rule := range policy.Rules {
|
||||
if isDirectPeerInSet(rule.SourceResource, changedSet) || isDirectPeerInSet(rule.DestinationResource, changedSet) {
|
||||
return true
|
||||
if isDirectPeerInSet(rule.SourceResource, changedSet) {
|
||||
groupIds = append(groupIds, rule.Destinations...)
|
||||
peerIds[rule.SourceResource.ID] = struct{}{}
|
||||
if rule.DestinationResource.Type == types.ResourceTypePeer && rule.DestinationResource.ID != "" {
|
||||
peerIds[rule.DestinationResource.ID] = struct{}{}
|
||||
}
|
||||
}
|
||||
// it's possible that the changeSet contains peer ids of both source and destination resources
|
||||
if isDirectPeerInSet(rule.DestinationResource, changedSet) {
|
||||
groupIds = append(groupIds, rule.Sources...)
|
||||
peerIds[rule.DestinationResource.ID] = struct{}{}
|
||||
if rule.SourceResource.Type == types.ResourceTypePeer && rule.SourceResource.ID != "" {
|
||||
peerIds[rule.SourceResource.ID] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
return slices.Collect(maps.Keys(peerIds)), groupIds
|
||||
}
|
||||
|
||||
func policyReferencesPostureChecks(policy *types.Policy, ids map[string]struct{}) bool {
|
||||
@@ -795,6 +844,16 @@ func anyInSet(ids []string, set map[string]struct{}) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func allInSet(ids []string, set map[string]struct{}) ([]string, bool) {
|
||||
var matchedIds []string
|
||||
for _, id := range ids {
|
||||
if _, ok := set[id]; ok {
|
||||
matchedIds = append(matchedIds, id)
|
||||
}
|
||||
}
|
||||
return matchedIds, len(matchedIds) > 0
|
||||
}
|
||||
|
||||
func isInSet(id string, set map[string]struct{}) bool {
|
||||
_, ok := set[id]
|
||||
return ok
|
||||
|
||||
@@ -80,24 +80,189 @@ func TestChangeIsEmpty(t *testing.T) {
|
||||
assert.False(t, Change{PostureCheckIDs: []string{"pc"}}.isEmpty())
|
||||
}
|
||||
|
||||
func TestPolicyReferencesGroups(t *testing.T) {
|
||||
policy := &types.Policy{Rules: []*types.PolicyRule{{Sources: []string{"g1", "g2"}, Destinations: []string{"g3"}}}}
|
||||
func TestGroupsFromPolicyDirectionally(t *testing.T) {
|
||||
policy := &types.Policy{Rules: []*types.PolicyRule{
|
||||
{Sources: []string{"g1", "g2"}, Destinations: []string{"g3"}},
|
||||
{Sources: []string{"g4"}, Destinations: []string{"g5", "g6"}},
|
||||
{Sources: []string{"g7"}, Destinations: []string{"g8"},
|
||||
SourceResource: types.Resource{ID: "r7", Type: types.ResourceTypePeer},
|
||||
DestinationResource: types.Resource{ID: "r8", Type: types.ResourceTypePeer}},
|
||||
{Sources: []string{"g9"}, Destinations: []string{"g10"},
|
||||
SourceResource: types.Resource{ID: "", Type: types.ResourceTypePeer},
|
||||
DestinationResource: types.Resource{ID: "", Type: types.ResourceTypePeer}},
|
||||
{Sources: []string{"g11"}, Destinations: []string{"g12"},
|
||||
SourceResource: types.Resource{ID: "r11", Type: types.ResourceTypeHost},
|
||||
DestinationResource: types.Resource{ID: "r12", Type: types.ResourceTypeHost}},
|
||||
}}
|
||||
|
||||
assert.True(t, policyReferencesGroups(policy, map[string]struct{}{"g1": {}}))
|
||||
assert.True(t, policyReferencesGroups(policy, map[string]struct{}{"g3": {}}))
|
||||
assert.False(t, policyReferencesGroups(policy, map[string]struct{}{"g4": {}}))
|
||||
assert.False(t, policyReferencesGroups(policy, map[string]struct{}{}))
|
||||
var tests = []struct {
|
||||
name string
|
||||
inGroups map[string]struct{}
|
||||
expectedPeerIds []string
|
||||
expectedGroupIds []string
|
||||
}{
|
||||
{
|
||||
name: "match sources",
|
||||
inGroups: map[string]struct{}{"g1": {}, "g4": {}},
|
||||
expectedPeerIds: []string{},
|
||||
expectedGroupIds: []string{"g1", "g4", "g3", "g5", "g6"},
|
||||
},
|
||||
{
|
||||
name: "match destinations",
|
||||
inGroups: map[string]struct{}{"g3": {}, "g6": {}},
|
||||
expectedPeerIds: []string{},
|
||||
expectedGroupIds: []string{"g1", "g2", "g4", "g3", "g6"},
|
||||
},
|
||||
{
|
||||
name: "should return destinations and destination resource",
|
||||
inGroups: map[string]struct{}{"g7": {}},
|
||||
expectedPeerIds: []string{"r8"},
|
||||
expectedGroupIds: []string{"g7", "g8"},
|
||||
},
|
||||
{
|
||||
name: "should return sources and source resource",
|
||||
inGroups: map[string]struct{}{"g8": {}},
|
||||
expectedPeerIds: []string{"r7"},
|
||||
expectedGroupIds: []string{"g7", "g8"},
|
||||
},
|
||||
{
|
||||
name: "should not return source resource (empty id)",
|
||||
inGroups: map[string]struct{}{"g10": {}},
|
||||
expectedPeerIds: []string{},
|
||||
expectedGroupIds: []string{"g9", "g10"},
|
||||
},
|
||||
{
|
||||
name: "should not return destination resource (empty id)",
|
||||
inGroups: map[string]struct{}{"g9": {}},
|
||||
expectedPeerIds: []string{},
|
||||
expectedGroupIds: []string{"g9", "g10"},
|
||||
},
|
||||
{
|
||||
name: "should not return source resource (non-peer type)",
|
||||
inGroups: map[string]struct{}{"g12": {}},
|
||||
expectedPeerIds: []string{},
|
||||
expectedGroupIds: []string{"g11", "g12"},
|
||||
},
|
||||
{
|
||||
name: "should not return destination resource (non-peer type)",
|
||||
inGroups: map[string]struct{}{"g12": {}},
|
||||
expectedPeerIds: []string{},
|
||||
expectedGroupIds: []string{"g11", "g12"},
|
||||
},
|
||||
{
|
||||
name: "non-existing group",
|
||||
inGroups: map[string]struct{}{"g33": {}},
|
||||
expectedPeerIds: []string{},
|
||||
expectedGroupIds: []string{},
|
||||
},
|
||||
{
|
||||
name: "empty groupset",
|
||||
inGroups: map[string]struct{}{},
|
||||
expectedPeerIds: []string{},
|
||||
expectedGroupIds: []string{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
peerIds, groupIds := getGroupsAndPeersFromPolicyViaGroups(policy, tt.inGroups)
|
||||
assert.ElementsMatch(t, peerIds, tt.expectedPeerIds)
|
||||
assert.ElementsMatch(t, groupIds, tt.expectedGroupIds)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPolicyReferencesDirectPeers(t *testing.T) {
|
||||
policy := &types.Policy{Rules: []*types.PolicyRule{{
|
||||
SourceResource: types.Resource{Type: types.ResourceTypePeer, ID: "p1"},
|
||||
DestinationResource: types.Resource{Type: types.ResourceTypeHost, ID: "r1"},
|
||||
}}}
|
||||
policy := &types.Policy{Rules: []*types.PolicyRule{
|
||||
{
|
||||
SourceResource: types.Resource{Type: types.ResourceTypePeer, ID: "p1"},
|
||||
DestinationResource: types.Resource{Type: types.ResourceTypePeer, ID: "r1"},
|
||||
Sources: []string{"sg1"},
|
||||
Destinations: []string{"dg1"},
|
||||
},
|
||||
{
|
||||
SourceResource: types.Resource{Type: types.ResourceTypePeer, ID: "p2"},
|
||||
DestinationResource: types.Resource{Type: types.ResourceTypePeer, ID: "r2"},
|
||||
Sources: []string{"sg2"},
|
||||
Destinations: []string{"dg2"},
|
||||
},
|
||||
{
|
||||
SourceResource: types.Resource{Type: types.ResourceTypePeer, ID: "p3"},
|
||||
DestinationResource: types.Resource{Type: types.ResourceTypeHost, ID: "r3"},
|
||||
Sources: []string{"sg3"},
|
||||
Destinations: []string{"dg3"},
|
||||
},
|
||||
{
|
||||
SourceResource: types.Resource{Type: types.ResourceTypeHost, ID: "p4"},
|
||||
DestinationResource: types.Resource{Type: types.ResourceTypePeer, ID: "r4"},
|
||||
Sources: []string{"sg4"},
|
||||
Destinations: []string{"dg4"},
|
||||
},
|
||||
{
|
||||
SourceResource: types.Resource{Type: types.ResourceTypeHost, ID: "p5"},
|
||||
DestinationResource: types.Resource{Type: types.ResourceTypePeer, ID: "r5"},
|
||||
Sources: []string{"sg5"},
|
||||
Destinations: []string{"dg5"},
|
||||
},
|
||||
{
|
||||
SourceResource: types.Resource{Type: types.ResourceTypePeer, ID: "p6"},
|
||||
DestinationResource: types.Resource{Type: types.ResourceTypeHost, ID: "r6"},
|
||||
Sources: []string{"sg6"},
|
||||
Destinations: []string{"dg6"},
|
||||
},
|
||||
{
|
||||
SourceResource: types.Resource{Type: types.ResourceTypePeer, ID: "p7"},
|
||||
DestinationResource: types.Resource{Type: types.ResourceTypePeer, ID: "r7"},
|
||||
Sources: []string{"sg7"},
|
||||
Destinations: []string{"dg7"},
|
||||
},
|
||||
}}
|
||||
|
||||
assert.True(t, policyReferencesDirectPeers(policy, map[string]struct{}{"p1": {}}))
|
||||
assert.False(t, policyReferencesDirectPeers(policy, map[string]struct{}{"r1": {}}))
|
||||
assert.False(t, policyReferencesDirectPeers(policy, map[string]struct{}{"p2": {}}))
|
||||
var tests = []struct {
|
||||
name string
|
||||
changedPeerIds map[string]struct{}
|
||||
expectedPeerIds []string
|
||||
expectedGroupIds []string
|
||||
}{
|
||||
{
|
||||
name: "match sources",
|
||||
changedPeerIds: map[string]struct{}{"p1": {}, "p2": {}},
|
||||
expectedPeerIds: []string{"p1", "p2", "r1", "r2"},
|
||||
expectedGroupIds: []string{"dg1", "dg2"},
|
||||
},
|
||||
{
|
||||
name: "match destinations",
|
||||
changedPeerIds: map[string]struct{}{"r1": {}, "r2": {}},
|
||||
expectedPeerIds: []string{"r1", "r2", "p1", "p2"},
|
||||
expectedGroupIds: []string{"sg1", "sg2"},
|
||||
},
|
||||
{
|
||||
name: "wrong opposing peer types, only changed peer ids and groups on the opposing end of the rule",
|
||||
changedPeerIds: map[string]struct{}{"p3": {}, "r4": {}},
|
||||
expectedPeerIds: []string{"p3", "r4"},
|
||||
expectedGroupIds: []string{"dg3", "sg4"},
|
||||
},
|
||||
{
|
||||
name: "wrong peer type, no matching peer ids",
|
||||
changedPeerIds: map[string]struct{}{"p5": {}, "r6": {}},
|
||||
expectedPeerIds: []string{},
|
||||
expectedGroupIds: []string{},
|
||||
},
|
||||
{
|
||||
name: "changed peers on both sides of the policy",
|
||||
changedPeerIds: map[string]struct{}{"p7": {}, "r7": {}},
|
||||
expectedPeerIds: []string{"p7", "r7"},
|
||||
expectedGroupIds: []string{"sg7", "dg7"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
peerIds, groupIds := getGroupsAndPeersFromPolicyViaPeers(policy, tt.changedPeerIds)
|
||||
assert.ElementsMatch(t, peerIds, tt.expectedPeerIds)
|
||||
assert.ElementsMatch(t, groupIds, tt.expectedGroupIds)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPolicyReferencesPostureChecks(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user