mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-17 07:46:38 +00:00
Compare commits
12 Commits
fix/macos-
...
add-slack-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
91a6577182 | ||
|
|
13539543af | ||
|
|
7483fec048 | ||
|
|
5259e5df51 | ||
|
|
ebd78e0122 | ||
|
|
cf86b9a528 | ||
|
|
ee588e1536 | ||
|
|
2a8aacc5c9 | ||
|
|
15709bc666 | ||
|
|
789b4113fe | ||
|
|
d2cdc0efec | ||
|
|
ee343d5d77 |
@@ -6,6 +6,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"github.com/coreos/go-iptables/iptables"
|
||||
"github.com/google/nftables"
|
||||
@@ -35,20 +36,27 @@ const SKIP_NFTABLES_ENV = "NB_SKIP_NFTABLES_CHECK"
|
||||
type FWType int
|
||||
|
||||
func NewFirewall(iface IFaceMapper, stateManager *statemanager.Manager, flowLogger nftypes.FlowLogger, disableServerRoutes bool, mtu uint16) (firewall.Manager, error) {
|
||||
// on the linux system we try to user nftables or iptables
|
||||
// in any case, because we need to allow netbird interface traffic
|
||||
// so we use AllowNetbird traffic from these firewall managers
|
||||
// for the userspace packet filtering firewall
|
||||
// We run in userspace mode and force userspace firewall was requested. We don't attempt native firewall.
|
||||
if iface.IsUserspaceBind() && forceUserspaceFirewall() {
|
||||
log.Info("forcing userspace firewall")
|
||||
return createUserspaceFirewall(iface, nil, disableServerRoutes, flowLogger, mtu)
|
||||
}
|
||||
|
||||
// Use native firewall for either kernel or userspace, the interface appears identical to netfilter
|
||||
fm, err := createNativeFirewall(iface, stateManager, disableServerRoutes, mtu)
|
||||
|
||||
// Kernel cannot fall back to anything else, need to return error
|
||||
if !iface.IsUserspaceBind() {
|
||||
return fm, err
|
||||
}
|
||||
|
||||
// Fall back to the userspace packet filter if native is unavailable
|
||||
if err != nil {
|
||||
log.Warnf("failed to create native firewall: %v. Proceeding with userspace", err)
|
||||
return createUserspaceFirewall(iface, nil, disableServerRoutes, flowLogger, mtu)
|
||||
}
|
||||
return createUserspaceFirewall(iface, fm, disableServerRoutes, flowLogger, mtu)
|
||||
|
||||
return fm, nil
|
||||
}
|
||||
|
||||
func createNativeFirewall(iface IFaceMapper, stateManager *statemanager.Manager, routes bool, mtu uint16) (firewall.Manager, error) {
|
||||
@@ -160,3 +168,17 @@ func isIptablesClientAvailable(client *iptables.IPTables) bool {
|
||||
_, err := client.ListChains("filter")
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func forceUserspaceFirewall() bool {
|
||||
val := os.Getenv(EnvForceUserspaceFirewall)
|
||||
if val == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
force, err := strconv.ParseBool(val)
|
||||
if err != nil {
|
||||
log.Warnf("failed to parse %s: %v", EnvForceUserspaceFirewall, err)
|
||||
return false
|
||||
}
|
||||
return force
|
||||
}
|
||||
|
||||
@@ -7,6 +7,12 @@ import (
|
||||
"github.com/netbirdio/netbird/client/iface/wgaddr"
|
||||
)
|
||||
|
||||
// EnvForceUserspaceFirewall forces the use of the userspace packet filter even when
|
||||
// native iptables/nftables is available. This only applies when the WireGuard interface
|
||||
// runs in userspace mode. When set, peer ACLs are handled by USPFilter instead of
|
||||
// kernel netfilter rules.
|
||||
const EnvForceUserspaceFirewall = "NB_FORCE_USERSPACE_FIREWALL"
|
||||
|
||||
// IFaceMapper defines subset methods of interface required for manager
|
||||
type IFaceMapper interface {
|
||||
Name() string
|
||||
|
||||
@@ -33,7 +33,6 @@ type Manager struct {
|
||||
type iFaceMapper interface {
|
||||
Name() string
|
||||
Address() wgaddr.Address
|
||||
IsUserspaceBind() bool
|
||||
}
|
||||
|
||||
// Create iptables firewall manager
|
||||
@@ -64,10 +63,9 @@ func Create(wgIface iFaceMapper, mtu uint16) (*Manager, error) {
|
||||
func (m *Manager) Init(stateManager *statemanager.Manager) error {
|
||||
state := &ShutdownState{
|
||||
InterfaceState: &InterfaceState{
|
||||
NameStr: m.wgIface.Name(),
|
||||
WGAddress: m.wgIface.Address(),
|
||||
UserspaceBind: m.wgIface.IsUserspaceBind(),
|
||||
MTU: m.router.mtu,
|
||||
NameStr: m.wgIface.Name(),
|
||||
WGAddress: m.wgIface.Address(),
|
||||
MTU: m.router.mtu,
|
||||
},
|
||||
}
|
||||
stateManager.RegisterState(state)
|
||||
@@ -203,12 +201,10 @@ func (m *Manager) Close(stateManager *statemanager.Manager) error {
|
||||
return nberrors.FormatErrorOrNil(merr)
|
||||
}
|
||||
|
||||
// AllowNetbird allows netbird interface traffic
|
||||
// AllowNetbird allows netbird interface traffic.
|
||||
// This is called when USPFilter wraps the native firewall, adding blanket accept
|
||||
// rules so that packet filtering is handled in userspace instead of by netfilter.
|
||||
func (m *Manager) AllowNetbird() error {
|
||||
if !m.wgIface.IsUserspaceBind() {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := m.AddPeerFiltering(
|
||||
nil,
|
||||
net.IP{0, 0, 0, 0},
|
||||
|
||||
@@ -47,8 +47,6 @@ func (i *iFaceMock) Address() wgaddr.Address {
|
||||
panic("AddressFunc is not set")
|
||||
}
|
||||
|
||||
func (i *iFaceMock) IsUserspaceBind() bool { return false }
|
||||
|
||||
func TestIptablesManager(t *testing.T) {
|
||||
ipv4Client, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -9,10 +9,9 @@ import (
|
||||
)
|
||||
|
||||
type InterfaceState struct {
|
||||
NameStr string `json:"name"`
|
||||
WGAddress wgaddr.Address `json:"wg_address"`
|
||||
UserspaceBind bool `json:"userspace_bind"`
|
||||
MTU uint16 `json:"mtu"`
|
||||
NameStr string `json:"name"`
|
||||
WGAddress wgaddr.Address `json:"wg_address"`
|
||||
MTU uint16 `json:"mtu"`
|
||||
}
|
||||
|
||||
func (i *InterfaceState) Name() string {
|
||||
@@ -23,10 +22,6 @@ func (i *InterfaceState) Address() wgaddr.Address {
|
||||
return i.WGAddress
|
||||
}
|
||||
|
||||
func (i *InterfaceState) IsUserspaceBind() bool {
|
||||
return i.UserspaceBind
|
||||
}
|
||||
|
||||
type ShutdownState struct {
|
||||
sync.Mutex
|
||||
|
||||
|
||||
@@ -40,7 +40,6 @@ func getTableName() string {
|
||||
type iFaceMapper interface {
|
||||
Name() string
|
||||
Address() wgaddr.Address
|
||||
IsUserspaceBind() bool
|
||||
}
|
||||
|
||||
// Manager of iptables firewall
|
||||
@@ -106,10 +105,9 @@ func (m *Manager) Init(stateManager *statemanager.Manager) error {
|
||||
// cleanup using Close() without needing to store specific rules.
|
||||
if err := stateManager.UpdateState(&ShutdownState{
|
||||
InterfaceState: &InterfaceState{
|
||||
NameStr: m.wgIface.Name(),
|
||||
WGAddress: m.wgIface.Address(),
|
||||
UserspaceBind: m.wgIface.IsUserspaceBind(),
|
||||
MTU: m.router.mtu,
|
||||
NameStr: m.wgIface.Name(),
|
||||
WGAddress: m.wgIface.Address(),
|
||||
MTU: m.router.mtu,
|
||||
},
|
||||
}); err != nil {
|
||||
log.Errorf("failed to update state: %v", err)
|
||||
@@ -205,12 +203,10 @@ func (m *Manager) RemoveNatRule(pair firewall.RouterPair) error {
|
||||
return m.router.RemoveNatRule(pair)
|
||||
}
|
||||
|
||||
// AllowNetbird allows netbird interface traffic
|
||||
// AllowNetbird allows netbird interface traffic.
|
||||
// This is called when USPFilter wraps the native firewall, adding blanket accept
|
||||
// rules so that packet filtering is handled in userspace instead of by netfilter.
|
||||
func (m *Manager) AllowNetbird() error {
|
||||
if !m.wgIface.IsUserspaceBind() {
|
||||
return nil
|
||||
}
|
||||
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
|
||||
@@ -52,8 +52,6 @@ func (i *iFaceMock) Address() wgaddr.Address {
|
||||
panic("AddressFunc is not set")
|
||||
}
|
||||
|
||||
func (i *iFaceMock) IsUserspaceBind() bool { return false }
|
||||
|
||||
func TestNftablesManager(t *testing.T) {
|
||||
|
||||
// just check on the local interface
|
||||
|
||||
@@ -8,10 +8,9 @@ import (
|
||||
)
|
||||
|
||||
type InterfaceState struct {
|
||||
NameStr string `json:"name"`
|
||||
WGAddress wgaddr.Address `json:"wg_address"`
|
||||
UserspaceBind bool `json:"userspace_bind"`
|
||||
MTU uint16 `json:"mtu"`
|
||||
NameStr string `json:"name"`
|
||||
WGAddress wgaddr.Address `json:"wg_address"`
|
||||
MTU uint16 `json:"mtu"`
|
||||
}
|
||||
|
||||
func (i *InterfaceState) Name() string {
|
||||
@@ -22,10 +21,6 @@ func (i *InterfaceState) Address() wgaddr.Address {
|
||||
return i.WGAddress
|
||||
}
|
||||
|
||||
func (i *InterfaceState) IsUserspaceBind() bool {
|
||||
return i.UserspaceBind
|
||||
}
|
||||
|
||||
type ShutdownState struct {
|
||||
InterfaceState *InterfaceState `json:"interface_state,omitempty"`
|
||||
}
|
||||
|
||||
@@ -19,6 +19,9 @@ import (
|
||||
var flowLogger = netflow.NewManager(nil, []byte{}, nil).GetLogger()
|
||||
|
||||
func TestDefaultManager(t *testing.T) {
|
||||
t.Setenv("NB_WG_KERNEL_DISABLED", "true")
|
||||
t.Setenv(firewall.EnvForceUserspaceFirewall, "true")
|
||||
|
||||
networkMap := &mgmProto.NetworkMap{
|
||||
FirewallRules: []*mgmProto.FirewallRule{
|
||||
{
|
||||
@@ -135,6 +138,7 @@ func TestDefaultManager(t *testing.T) {
|
||||
func TestDefaultManagerStateless(t *testing.T) {
|
||||
// stateless currently only in userspace, so we have to disable kernel
|
||||
t.Setenv("NB_WG_KERNEL_DISABLED", "true")
|
||||
t.Setenv(firewall.EnvForceUserspaceFirewall, "true")
|
||||
t.Setenv("NB_DISABLE_CONNTRACK", "true")
|
||||
|
||||
networkMap := &mgmProto.NetworkMap{
|
||||
@@ -194,6 +198,7 @@ func TestDefaultManagerStateless(t *testing.T) {
|
||||
// This tests the full ACL manager -> uspfilter integration.
|
||||
func TestDenyRulesNotAccumulatedOnRepeatedApply(t *testing.T) {
|
||||
t.Setenv("NB_WG_KERNEL_DISABLED", "true")
|
||||
t.Setenv(firewall.EnvForceUserspaceFirewall, "true")
|
||||
|
||||
networkMap := &mgmProto.NetworkMap{
|
||||
FirewallRules: []*mgmProto.FirewallRule{
|
||||
@@ -258,6 +263,7 @@ func TestDenyRulesNotAccumulatedOnRepeatedApply(t *testing.T) {
|
||||
// up when they're removed from the network map in a subsequent update.
|
||||
func TestDenyRulesCleanedUpOnRemoval(t *testing.T) {
|
||||
t.Setenv("NB_WG_KERNEL_DISABLED", "true")
|
||||
t.Setenv(firewall.EnvForceUserspaceFirewall, "true")
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
@@ -339,6 +345,7 @@ func TestDenyRulesCleanedUpOnRemoval(t *testing.T) {
|
||||
// one added without leaking.
|
||||
func TestRuleUpdateChangingAction(t *testing.T) {
|
||||
t.Setenv("NB_WG_KERNEL_DISABLED", "true")
|
||||
t.Setenv(firewall.EnvForceUserspaceFirewall, "true")
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
@@ -168,6 +168,7 @@ func (m *DefaultManager) setupAndroidRoutes(config ManagerConfig) {
|
||||
NetworkType: route.IPv4Network,
|
||||
}
|
||||
cr = append(cr, fakeIPRoute)
|
||||
m.notifier.SetFakeIPRoute(fakeIPRoute)
|
||||
}
|
||||
|
||||
m.notifier.SetInitialClientRoutes(cr, routesForComparison)
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
type Notifier struct {
|
||||
initialRoutes []*route.Route
|
||||
currentRoutes []*route.Route
|
||||
fakeIPRoute *route.Route
|
||||
|
||||
listener listener.NetworkChangeListener
|
||||
listenerMux sync.Mutex
|
||||
@@ -31,13 +32,17 @@ func (n *Notifier) SetListener(listener listener.NetworkChangeListener) {
|
||||
n.listener = listener
|
||||
}
|
||||
|
||||
// SetInitialClientRoutes stores the full initial route set (including fake IP blocks)
|
||||
// and a separate comparison set (without fake IP blocks) for diff detection.
|
||||
// SetInitialClientRoutes stores the initial route sets for TUN configuration.
|
||||
func (n *Notifier) SetInitialClientRoutes(initialRoutes []*route.Route, routesForComparison []*route.Route) {
|
||||
n.initialRoutes = filterStatic(initialRoutes)
|
||||
n.currentRoutes = filterStatic(routesForComparison)
|
||||
}
|
||||
|
||||
// SetFakeIPRoute stores the fake IP route to be included in every TUN rebuild.
|
||||
func (n *Notifier) SetFakeIPRoute(r *route.Route) {
|
||||
n.fakeIPRoute = r
|
||||
}
|
||||
|
||||
func (n *Notifier) OnNewRoutes(idMap route.HAMap) {
|
||||
var newRoutes []*route.Route
|
||||
for _, routes := range idMap {
|
||||
@@ -69,7 +74,9 @@ func (n *Notifier) notify() {
|
||||
}
|
||||
|
||||
allRoutes := slices.Clone(n.currentRoutes)
|
||||
allRoutes = append(allRoutes, n.extraInitialRoutes()...)
|
||||
if n.fakeIPRoute != nil {
|
||||
allRoutes = append(allRoutes, n.fakeIPRoute)
|
||||
}
|
||||
|
||||
routeStrings := n.routesToStrings(allRoutes)
|
||||
sort.Strings(routeStrings)
|
||||
@@ -78,23 +85,6 @@ func (n *Notifier) notify() {
|
||||
}(n.listener)
|
||||
}
|
||||
|
||||
// extraInitialRoutes returns initialRoutes whose network prefix is absent
|
||||
// from currentRoutes (e.g. the fake IP block added at setup time).
|
||||
func (n *Notifier) extraInitialRoutes() []*route.Route {
|
||||
currentNets := make(map[netip.Prefix]struct{}, len(n.currentRoutes))
|
||||
for _, r := range n.currentRoutes {
|
||||
currentNets[r.Network] = struct{}{}
|
||||
}
|
||||
|
||||
var extra []*route.Route
|
||||
for _, r := range n.initialRoutes {
|
||||
if _, ok := currentNets[r.Network]; !ok {
|
||||
extra = append(extra, r)
|
||||
}
|
||||
}
|
||||
return extra
|
||||
}
|
||||
|
||||
func filterStatic(routes []*route.Route) []*route.Route {
|
||||
out := make([]*route.Route, 0, len(routes))
|
||||
for _, r := range routes {
|
||||
|
||||
@@ -34,6 +34,10 @@ func (n *Notifier) SetInitialClientRoutes([]*route.Route, []*route.Route) {
|
||||
// iOS doesn't care about initial routes
|
||||
}
|
||||
|
||||
func (n *Notifier) SetFakeIPRoute(*route.Route) {
|
||||
// Not used on iOS
|
||||
}
|
||||
|
||||
func (n *Notifier) OnNewRoutes(route.HAMap) {
|
||||
// Not used on iOS
|
||||
}
|
||||
|
||||
@@ -23,6 +23,10 @@ func (n *Notifier) SetInitialClientRoutes([]*route.Route, []*route.Route) {
|
||||
// Not used on non-mobile platforms
|
||||
}
|
||||
|
||||
func (n *Notifier) SetFakeIPRoute(*route.Route) {
|
||||
// Not used on non-mobile platforms
|
||||
}
|
||||
|
||||
func (n *Notifier) OnNewRoutes(idMap route.HAMap) {
|
||||
// Not used on non-mobile platforms
|
||||
}
|
||||
|
||||
@@ -179,9 +179,11 @@ type StoreConfig struct {
|
||||
|
||||
// ReverseProxyConfig contains reverse proxy settings
|
||||
type ReverseProxyConfig struct {
|
||||
TrustedHTTPProxies []string `yaml:"trustedHTTPProxies"`
|
||||
TrustedHTTPProxiesCount uint `yaml:"trustedHTTPProxiesCount"`
|
||||
TrustedPeers []string `yaml:"trustedPeers"`
|
||||
TrustedHTTPProxies []string `yaml:"trustedHTTPProxies"`
|
||||
TrustedHTTPProxiesCount uint `yaml:"trustedHTTPProxiesCount"`
|
||||
TrustedPeers []string `yaml:"trustedPeers"`
|
||||
AccessLogRetentionDays int `yaml:"accessLogRetentionDays"`
|
||||
AccessLogCleanupIntervalHours int `yaml:"accessLogCleanupIntervalHours"`
|
||||
}
|
||||
|
||||
// DefaultConfig returns a CombinedConfig with default values
|
||||
@@ -645,7 +647,9 @@ func (c *CombinedConfig) ToManagementConfig() (*nbconfig.Config, error) {
|
||||
|
||||
// Build reverse proxy config
|
||||
reverseProxy := nbconfig.ReverseProxy{
|
||||
TrustedHTTPProxiesCount: mgmt.ReverseProxy.TrustedHTTPProxiesCount,
|
||||
TrustedHTTPProxiesCount: mgmt.ReverseProxy.TrustedHTTPProxiesCount,
|
||||
AccessLogRetentionDays: mgmt.ReverseProxy.AccessLogRetentionDays,
|
||||
AccessLogCleanupIntervalHours: mgmt.ReverseProxy.AccessLogCleanupIntervalHours,
|
||||
}
|
||||
for _, p := range mgmt.ReverseProxy.TrustedHTTPProxies {
|
||||
if prefix, err := netip.ParsePrefix(p); err == nil {
|
||||
|
||||
@@ -14,7 +14,6 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@@ -26,11 +25,22 @@ import (
|
||||
"github.com/netbirdio/netbird/util/wsproxy"
|
||||
)
|
||||
|
||||
var ErrClientClosed = errors.New("client is closed")
|
||||
|
||||
// minHealthyDuration is the minimum time a stream must survive before a failure
|
||||
// resets the backoff timer. Streams that fail faster are considered unhealthy and
|
||||
// should not reset backoff, so that MaxElapsedTime can eventually stop retries.
|
||||
const minHealthyDuration = 5 * time.Second
|
||||
|
||||
type GRPCClient struct {
|
||||
realClient proto.FlowServiceClient
|
||||
clientConn *grpc.ClientConn
|
||||
stream proto.FlowService_EventsClient
|
||||
streamMu sync.Mutex
|
||||
target string
|
||||
opts []grpc.DialOption
|
||||
closed bool // prevent creating conn in the middle of the Close
|
||||
receiving bool // prevent concurrent Receive calls
|
||||
mu sync.Mutex // protects clientConn, realClient, stream, closed, and receiving
|
||||
}
|
||||
|
||||
func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCClient, error) {
|
||||
@@ -65,7 +75,8 @@ func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCCl
|
||||
grpc.WithDefaultServiceConfig(`{"healthCheckConfig": {"serviceName": ""}}`),
|
||||
)
|
||||
|
||||
conn, err := grpc.NewClient(fmt.Sprintf("%s:%s", parsedURL.Hostname(), parsedURL.Port()), opts...)
|
||||
target := parsedURL.Host
|
||||
conn, err := grpc.NewClient(target, opts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating new grpc client: %w", err)
|
||||
}
|
||||
@@ -73,30 +84,73 @@ func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCCl
|
||||
return &GRPCClient{
|
||||
realClient: proto.NewFlowServiceClient(conn),
|
||||
clientConn: conn,
|
||||
target: target,
|
||||
opts: opts,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *GRPCClient) Close() error {
|
||||
c.streamMu.Lock()
|
||||
defer c.streamMu.Unlock()
|
||||
|
||||
c.mu.Lock()
|
||||
c.closed = true
|
||||
c.stream = nil
|
||||
if err := c.clientConn.Close(); err != nil && !errors.Is(err, context.Canceled) {
|
||||
conn := c.clientConn
|
||||
c.clientConn = nil
|
||||
c.mu.Unlock()
|
||||
|
||||
if conn == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := conn.Close(); err != nil && !errors.Is(err, context.Canceled) {
|
||||
return fmt.Errorf("close client connection: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *GRPCClient) Send(event *proto.FlowEvent) error {
|
||||
c.mu.Lock()
|
||||
stream := c.stream
|
||||
c.mu.Unlock()
|
||||
|
||||
if stream == nil {
|
||||
return errors.New("stream not initialized")
|
||||
}
|
||||
|
||||
if err := stream.Send(event); err != nil {
|
||||
return fmt.Errorf("send flow event: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHandler func(msg *proto.FlowEventAck) error) error {
|
||||
c.mu.Lock()
|
||||
if c.receiving {
|
||||
c.mu.Unlock()
|
||||
return errors.New("concurrent Receive calls are not supported")
|
||||
}
|
||||
c.receiving = true
|
||||
c.mu.Unlock()
|
||||
defer func() {
|
||||
c.mu.Lock()
|
||||
c.receiving = false
|
||||
c.mu.Unlock()
|
||||
}()
|
||||
|
||||
backOff := defaultBackoff(ctx, interval)
|
||||
operation := func() error {
|
||||
if err := c.establishStreamAndReceive(ctx, msgHandler); err != nil {
|
||||
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
|
||||
return fmt.Errorf("receive: %w: %w", err, context.Canceled)
|
||||
}
|
||||
stream, err := c.establishStream(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("failed to establish flow stream, retrying: %v", err)
|
||||
return c.handleRetryableError(err, time.Time{}, backOff)
|
||||
}
|
||||
|
||||
streamStart := time.Now()
|
||||
|
||||
if err := c.receive(stream, msgHandler); err != nil {
|
||||
log.Errorf("receive failed: %v", err)
|
||||
return fmt.Errorf("receive: %w", err)
|
||||
return c.handleRetryableError(err, streamStart, backOff)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -108,37 +162,106 @@ func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHan
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *GRPCClient) establishStreamAndReceive(ctx context.Context, msgHandler func(msg *proto.FlowEventAck) error) error {
|
||||
if c.clientConn.GetState() == connectivity.Shutdown {
|
||||
return errors.New("connection to flow receiver has been shut down")
|
||||
// handleRetryableError resets the backoff timer if the stream was healthy long
|
||||
// enough and recreates the underlying ClientConn so that gRPC's internal
|
||||
// subchannel backoff does not accumulate and compete with our own retry timer.
|
||||
// A zero streamStart means the stream was never established.
|
||||
func (c *GRPCClient) handleRetryableError(err error, streamStart time.Time, backOff backoff.BackOff) error {
|
||||
if isContextDone(err) {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
stream, err := c.realClient.Events(ctx, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
return fmt.Errorf("create event stream: %w", err)
|
||||
var permErr *backoff.PermanentError
|
||||
if errors.As(err, &permErr) {
|
||||
return err
|
||||
}
|
||||
|
||||
err = stream.Send(&proto.FlowEvent{IsInitiator: true})
|
||||
// Reset the backoff so the next retry starts with a short delay instead of
|
||||
// continuing the already-elapsed timer. Only do this if the stream was healthy
|
||||
// long enough; short-lived connect/drop cycles must not defeat MaxElapsedTime.
|
||||
if !streamStart.IsZero() && time.Since(streamStart) >= minHealthyDuration {
|
||||
backOff.Reset()
|
||||
}
|
||||
|
||||
if recreateErr := c.recreateConnection(); recreateErr != nil {
|
||||
log.Errorf("recreate connection: %v", recreateErr)
|
||||
return recreateErr
|
||||
}
|
||||
|
||||
log.Infof("connection recreated, retrying stream")
|
||||
return fmt.Errorf("retrying after error: %w", err)
|
||||
}
|
||||
|
||||
func (c *GRPCClient) recreateConnection() error {
|
||||
c.mu.Lock()
|
||||
if c.closed {
|
||||
c.mu.Unlock()
|
||||
return backoff.Permanent(ErrClientClosed)
|
||||
}
|
||||
|
||||
conn, err := grpc.NewClient(c.target, c.opts...)
|
||||
if err != nil {
|
||||
log.Infof("failed to send initiator message to flow receiver but will attempt to continue. Error: %s", err)
|
||||
c.mu.Unlock()
|
||||
return fmt.Errorf("create new connection: %w", err)
|
||||
}
|
||||
|
||||
old := c.clientConn
|
||||
c.clientConn = conn
|
||||
c.realClient = proto.NewFlowServiceClient(conn)
|
||||
c.stream = nil
|
||||
c.mu.Unlock()
|
||||
|
||||
_ = old.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *GRPCClient) establishStream(ctx context.Context) (proto.FlowService_EventsClient, error) {
|
||||
c.mu.Lock()
|
||||
if c.closed {
|
||||
c.mu.Unlock()
|
||||
return nil, backoff.Permanent(ErrClientClosed)
|
||||
}
|
||||
cl := c.realClient
|
||||
c.mu.Unlock()
|
||||
|
||||
// open stream outside the lock — blocking operation
|
||||
stream, err := cl.Events(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create event stream: %w", err)
|
||||
}
|
||||
streamReady := false
|
||||
defer func() {
|
||||
if !streamReady {
|
||||
_ = stream.CloseSend()
|
||||
}
|
||||
}()
|
||||
|
||||
if err = stream.Send(&proto.FlowEvent{IsInitiator: true}); err != nil {
|
||||
return nil, fmt.Errorf("send initiator: %w", err)
|
||||
}
|
||||
|
||||
if err = checkHeader(stream); err != nil {
|
||||
return fmt.Errorf("check header: %w", err)
|
||||
return nil, fmt.Errorf("check header: %w", err)
|
||||
}
|
||||
|
||||
c.streamMu.Lock()
|
||||
c.mu.Lock()
|
||||
if c.closed {
|
||||
c.mu.Unlock()
|
||||
return nil, backoff.Permanent(ErrClientClosed)
|
||||
}
|
||||
c.stream = stream
|
||||
c.streamMu.Unlock()
|
||||
c.mu.Unlock()
|
||||
streamReady = true
|
||||
|
||||
return c.receive(stream, msgHandler)
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
func (c *GRPCClient) receive(stream proto.FlowService_EventsClient, msgHandler func(msg *proto.FlowEventAck) error) error {
|
||||
for {
|
||||
msg, err := stream.Recv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("receive from stream: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if msg.IsInitiator {
|
||||
@@ -169,7 +292,7 @@ func checkHeader(stream proto.FlowService_EventsClient) error {
|
||||
func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff {
|
||||
return backoff.WithContext(&backoff.ExponentialBackOff{
|
||||
InitialInterval: 800 * time.Millisecond,
|
||||
RandomizationFactor: 1,
|
||||
RandomizationFactor: 0.5,
|
||||
Multiplier: 1.7,
|
||||
MaxInterval: interval / 2,
|
||||
MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
|
||||
@@ -178,18 +301,12 @@ func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff
|
||||
}, ctx)
|
||||
}
|
||||
|
||||
func (c *GRPCClient) Send(event *proto.FlowEvent) error {
|
||||
c.streamMu.Lock()
|
||||
stream := c.stream
|
||||
c.streamMu.Unlock()
|
||||
|
||||
if stream == nil {
|
||||
return errors.New("stream not initialized")
|
||||
func isContextDone(err error) bool {
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return true
|
||||
}
|
||||
|
||||
if err := stream.Send(event); err != nil {
|
||||
return fmt.Errorf("send flow event: %w", err)
|
||||
if s, ok := status.FromError(err); ok {
|
||||
return s.Code() == codes.Canceled || s.Code() == codes.DeadlineExceeded
|
||||
}
|
||||
|
||||
return nil
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -2,8 +2,11 @@ package client_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -11,6 +14,8 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
flow "github.com/netbirdio/netbird/flow/client"
|
||||
"github.com/netbirdio/netbird/flow/proto"
|
||||
@@ -18,21 +23,89 @@ import (
|
||||
|
||||
type testServer struct {
|
||||
proto.UnimplementedFlowServiceServer
|
||||
events chan *proto.FlowEvent
|
||||
acks chan *proto.FlowEventAck
|
||||
grpcSrv *grpc.Server
|
||||
addr string
|
||||
events chan *proto.FlowEvent
|
||||
acks chan *proto.FlowEventAck
|
||||
grpcSrv *grpc.Server
|
||||
addr string
|
||||
listener *connTrackListener
|
||||
closeStream chan struct{} // signal server to close the stream
|
||||
handlerDone chan struct{} // signaled each time Events() exits
|
||||
handlerStarted chan struct{} // signaled each time Events() begins
|
||||
}
|
||||
|
||||
// connTrackListener wraps a net.Listener to track accepted connections
|
||||
// so tests can forcefully close them to simulate PROTOCOL_ERROR/RST_STREAM.
|
||||
type connTrackListener struct {
|
||||
net.Listener
|
||||
mu sync.Mutex
|
||||
conns []net.Conn
|
||||
}
|
||||
|
||||
func (l *connTrackListener) Accept() (net.Conn, error) {
|
||||
c, err := l.Listener.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l.mu.Lock()
|
||||
l.conns = append(l.conns, c)
|
||||
l.mu.Unlock()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// sendRSTStream writes a raw HTTP/2 RST_STREAM frame with PROTOCOL_ERROR
|
||||
// (error code 0x1) on every tracked connection. This produces the exact error:
|
||||
//
|
||||
// rpc error: code = Internal desc = stream terminated by RST_STREAM with error code: PROTOCOL_ERROR
|
||||
//
|
||||
// HTTP/2 RST_STREAM frame format (9-byte header + 4-byte payload):
|
||||
//
|
||||
// Length (3 bytes): 0x000004
|
||||
// Type (1 byte): 0x03 (RST_STREAM)
|
||||
// Flags (1 byte): 0x00
|
||||
// Stream ID (4 bytes): target stream (must have bit 31 clear)
|
||||
// Error Code (4 bytes): 0x00000001 (PROTOCOL_ERROR)
|
||||
func (l *connTrackListener) connCount() int {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
return len(l.conns)
|
||||
}
|
||||
|
||||
func (l *connTrackListener) sendRSTStream(streamID uint32) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
frame := make([]byte, 13) // 9-byte header + 4-byte payload
|
||||
// Length = 4 (3 bytes, big-endian)
|
||||
frame[0], frame[1], frame[2] = 0, 0, 4
|
||||
// Type = RST_STREAM (0x03)
|
||||
frame[3] = 0x03
|
||||
// Flags = 0
|
||||
frame[4] = 0x00
|
||||
// Stream ID (4 bytes, big-endian, bit 31 reserved = 0)
|
||||
binary.BigEndian.PutUint32(frame[5:9], streamID)
|
||||
// Error Code = PROTOCOL_ERROR (0x1)
|
||||
binary.BigEndian.PutUint32(frame[9:13], 0x1)
|
||||
|
||||
for _, c := range l.conns {
|
||||
_, _ = c.Write(frame)
|
||||
}
|
||||
}
|
||||
|
||||
func newTestServer(t *testing.T) *testServer {
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
rawListener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
listener := &connTrackListener{Listener: rawListener}
|
||||
|
||||
s := &testServer{
|
||||
events: make(chan *proto.FlowEvent, 100),
|
||||
acks: make(chan *proto.FlowEventAck, 100),
|
||||
grpcSrv: grpc.NewServer(),
|
||||
addr: listener.Addr().String(),
|
||||
events: make(chan *proto.FlowEvent, 100),
|
||||
acks: make(chan *proto.FlowEventAck, 100),
|
||||
grpcSrv: grpc.NewServer(),
|
||||
addr: rawListener.Addr().String(),
|
||||
listener: listener,
|
||||
closeStream: make(chan struct{}, 1),
|
||||
handlerDone: make(chan struct{}, 10),
|
||||
handlerStarted: make(chan struct{}, 10),
|
||||
}
|
||||
|
||||
proto.RegisterFlowServiceServer(s.grpcSrv, s)
|
||||
@@ -51,11 +124,23 @@ func newTestServer(t *testing.T) *testServer {
|
||||
}
|
||||
|
||||
func (s *testServer) Events(stream proto.FlowService_EventsServer) error {
|
||||
defer func() {
|
||||
select {
|
||||
case s.handlerDone <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
|
||||
err := stream.Send(&proto.FlowEventAck{IsInitiator: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case s.handlerStarted <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(stream.Context())
|
||||
defer cancel()
|
||||
|
||||
@@ -91,6 +176,8 @@ func (s *testServer) Events(stream proto.FlowService_EventsServer) error {
|
||||
if err := stream.Send(ack); err != nil {
|
||||
return err
|
||||
}
|
||||
case <-s.closeStream:
|
||||
return status.Errorf(codes.Internal, "server closing stream")
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
@@ -110,16 +197,13 @@ func TestReceive(t *testing.T) {
|
||||
assert.NoError(t, err, "failed to close flow")
|
||||
})
|
||||
|
||||
receivedAcks := make(map[string]bool)
|
||||
var ackCount atomic.Int32
|
||||
receiveDone := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
err := client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
|
||||
if !msg.IsInitiator && len(msg.EventId) > 0 {
|
||||
id := string(msg.EventId)
|
||||
receivedAcks[id] = true
|
||||
|
||||
if len(receivedAcks) >= 3 {
|
||||
if ackCount.Add(1) >= 3 {
|
||||
close(receiveDone)
|
||||
}
|
||||
}
|
||||
@@ -130,7 +214,11 @@ func TestReceive(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
select {
|
||||
case <-server.handlerStarted:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("timeout waiting for stream to be established")
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
eventID := uuid.New().String()
|
||||
@@ -153,7 +241,7 @@ func TestReceive(t *testing.T) {
|
||||
t.Fatal("timeout waiting for acks to be processed")
|
||||
}
|
||||
|
||||
assert.Equal(t, 3, len(receivedAcks))
|
||||
assert.Equal(t, int32(3), ackCount.Load())
|
||||
}
|
||||
|
||||
func TestReceive_ContextCancellation(t *testing.T) {
|
||||
@@ -254,3 +342,195 @@ func TestSend(t *testing.T) {
|
||||
t.Fatal("timeout waiting for ack to be received by flow")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewClient_PermanentClose(t *testing.T) {
|
||||
server := newTestServer(t)
|
||||
|
||||
client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = client.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
require.ErrorIs(t, err, flow.ErrClientClosed)
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Receive did not return after Close — stuck in retry loop")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewClient_CloseVerify(t *testing.T) {
|
||||
server := newTestServer(t)
|
||||
|
||||
client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
|
||||
closeDone := make(chan struct{}, 1)
|
||||
go func() {
|
||||
_ = client.Close()
|
||||
closeDone <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
require.Error(t, err)
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Receive did not return after Close — stuck in retry loop")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-closeDone:
|
||||
return
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Close did not return — blocked in retry loop")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestClose_WhileReceiving(t *testing.T) {
|
||||
server := newTestServer(t)
|
||||
client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background() // no timeout — intentional
|
||||
receiveDone := make(chan struct{})
|
||||
go func() {
|
||||
_ = client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
|
||||
return nil
|
||||
})
|
||||
close(receiveDone)
|
||||
}()
|
||||
|
||||
// Wait for the server-side handler to confirm the stream is established.
|
||||
select {
|
||||
case <-server.handlerStarted:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("timeout waiting for stream to be established")
|
||||
}
|
||||
|
||||
closeDone := make(chan struct{})
|
||||
go func() {
|
||||
_ = client.Close()
|
||||
close(closeDone)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-closeDone:
|
||||
// Close returned — good
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Close blocked forever — Receive stuck in retry loop")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-receiveDone:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Receive did not exit after Close")
|
||||
}
|
||||
}
|
||||
|
||||
func TestReceive_ProtocolErrorStreamReconnect(t *testing.T) {
|
||||
server := newTestServer(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
err := client.Close()
|
||||
assert.NoError(t, err, "failed to close flow")
|
||||
})
|
||||
|
||||
// Track acks received before and after server-side stream close
|
||||
var ackCount atomic.Int32
|
||||
receivedFirst := make(chan struct{})
|
||||
receivedAfterReconnect := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
err := client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
|
||||
if msg.IsInitiator || len(msg.EventId) == 0 {
|
||||
return nil
|
||||
}
|
||||
n := ackCount.Add(1)
|
||||
if n == 1 {
|
||||
close(receivedFirst)
|
||||
}
|
||||
if n == 2 {
|
||||
close(receivedAfterReconnect)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
t.Logf("receive error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for stream to be established, then send first ack
|
||||
select {
|
||||
case <-server.handlerStarted:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("timeout waiting for stream to be established")
|
||||
}
|
||||
server.acks <- &proto.FlowEventAck{EventId: []byte("before-close")}
|
||||
|
||||
select {
|
||||
case <-receivedFirst:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("timeout waiting for first ack")
|
||||
}
|
||||
|
||||
// Snapshot connection count before injecting the fault.
|
||||
connsBefore := server.listener.connCount()
|
||||
|
||||
// Send a raw HTTP/2 RST_STREAM frame with PROTOCOL_ERROR on the TCP connection.
|
||||
// gRPC multiplexes streams on stream IDs 1, 3, 5, ... (odd, client-initiated).
|
||||
// Stream ID 1 is the client's first stream (our Events bidi stream).
|
||||
// This produces the exact error the client sees in production:
|
||||
// "stream terminated by RST_STREAM with error code: PROTOCOL_ERROR"
|
||||
server.listener.sendRSTStream(1)
|
||||
|
||||
// Wait for the old Events() handler to fully exit so it can no longer
|
||||
// drain s.acks and drop our injected ack on a broken stream.
|
||||
select {
|
||||
case <-server.handlerDone:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("old Events() handler did not exit after RST_STREAM")
|
||||
}
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return server.listener.connCount() > connsBefore
|
||||
}, 5*time.Second, 50*time.Millisecond, "client did not open a new TCP connection after RST_STREAM")
|
||||
|
||||
server.acks <- &proto.FlowEventAck{EventId: []byte("after-close")}
|
||||
|
||||
select {
|
||||
case <-receivedAfterReconnect:
|
||||
// Client successfully reconnected and received ack after server-side stream close
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timeout waiting for ack after server-side stream close — client did not reconnect")
|
||||
}
|
||||
|
||||
assert.GreaterOrEqual(t, int(ackCount.Load()), 2, "should have received acks before and after stream close")
|
||||
assert.GreaterOrEqual(t, server.listener.connCount(), 2, "client should have created at least 2 TCP connections (original + reconnect)")
|
||||
}
|
||||
|
||||
@@ -302,7 +302,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "rate(management_account_peer_meta_update_counter_ratio_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
|
||||
"expr": "rate(management_account_peer_meta_update_counter_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
|
||||
"instant": false,
|
||||
"legendFormat": "{{cluster}}/{{environment}}/{{job}}",
|
||||
"range": true,
|
||||
@@ -410,7 +410,7 @@
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.5,sum(increase(management_account_get_peer_network_map_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"expr": "histogram_quantile(0.5,sum(increase(management_account_get_peer_network_map_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"format": "heatmap",
|
||||
"fullMetaSearch": false,
|
||||
"includeNullMetadata": true,
|
||||
@@ -426,7 +426,7 @@
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.9,sum(increase(management_account_get_peer_network_map_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"expr": "histogram_quantile(0.9,sum(increase(management_account_get_peer_network_map_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"format": "heatmap",
|
||||
"fullMetaSearch": false,
|
||||
"hide": false,
|
||||
@@ -443,7 +443,7 @@
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99,sum(increase(management_account_get_peer_network_map_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"expr": "histogram_quantile(0.99,sum(increase(management_account_get_peer_network_map_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"format": "heatmap",
|
||||
"fullMetaSearch": false,
|
||||
"hide": false,
|
||||
@@ -545,7 +545,7 @@
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.5,sum(increase(management_account_update_account_peers_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"expr": "histogram_quantile(0.5,sum(increase(management_account_update_account_peers_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"format": "heatmap",
|
||||
"fullMetaSearch": false,
|
||||
"includeNullMetadata": true,
|
||||
@@ -561,7 +561,7 @@
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.9,sum(increase(management_account_update_account_peers_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"expr": "histogram_quantile(0.9,sum(increase(management_account_update_account_peers_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"format": "heatmap",
|
||||
"fullMetaSearch": false,
|
||||
"hide": false,
|
||||
@@ -578,7 +578,7 @@
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99,sum(increase(management_account_update_account_peers_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"expr": "histogram_quantile(0.99,sum(increase(management_account_update_account_peers_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"format": "heatmap",
|
||||
"fullMetaSearch": false,
|
||||
"hide": false,
|
||||
@@ -694,7 +694,7 @@
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.5,sum(increase(management_grpc_updatechannel_queue_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"expr": "histogram_quantile(0.5,sum(increase(management_grpc_updatechannel_queue_length_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"format": "heatmap",
|
||||
"fullMetaSearch": false,
|
||||
"includeNullMetadata": true,
|
||||
@@ -710,7 +710,7 @@
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.9,sum(increase(management_grpc_updatechannel_queue_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"expr": "histogram_quantile(0.9,sum(increase(management_grpc_updatechannel_queue_length_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"format": "heatmap",
|
||||
"fullMetaSearch": false,
|
||||
"hide": false,
|
||||
@@ -727,7 +727,7 @@
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99,sum(increase(management_grpc_updatechannel_queue_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"expr": "histogram_quantile(0.99,sum(increase(management_grpc_updatechannel_queue_length_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
|
||||
"format": "heatmap",
|
||||
"fullMetaSearch": false,
|
||||
"hide": false,
|
||||
@@ -841,7 +841,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.50, sum(rate(management_store_persistence_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"expr": "histogram_quantile(0.50, sum(rate(management_store_persistence_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"instant": false,
|
||||
"legendFormat": "p50",
|
||||
"range": true,
|
||||
@@ -853,7 +853,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.90, sum(rate(management_store_persistence_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"expr": "histogram_quantile(0.90, sum(rate(management_store_persistence_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"hide": false,
|
||||
"instant": false,
|
||||
"legendFormat": "p90",
|
||||
@@ -866,7 +866,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99, sum(rate(management_store_persistence_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"expr": "histogram_quantile(0.99, sum(rate(management_store_persistence_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"hide": false,
|
||||
"instant": false,
|
||||
"legendFormat": "p99",
|
||||
@@ -963,7 +963,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.50, sum(rate(management_store_transaction_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"expr": "histogram_quantile(0.50, sum(rate(management_store_transaction_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"instant": false,
|
||||
"legendFormat": "p50",
|
||||
"range": true,
|
||||
@@ -975,7 +975,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.90, sum(rate(management_store_transaction_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"expr": "histogram_quantile(0.90, sum(rate(management_store_transaction_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"hide": false,
|
||||
"instant": false,
|
||||
"legendFormat": "p90",
|
||||
@@ -988,7 +988,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99, sum(rate(management_store_transaction_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"expr": "histogram_quantile(0.99, sum(rate(management_store_transaction_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"hide": false,
|
||||
"instant": false,
|
||||
"legendFormat": "p99",
|
||||
@@ -1085,7 +1085,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.50, sum(rate(management_store_global_lock_acquisition_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"expr": "histogram_quantile(0.50, sum(rate(management_store_global_lock_acquisition_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"instant": false,
|
||||
"legendFormat": "p50",
|
||||
"range": true,
|
||||
@@ -1097,7 +1097,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.90, sum(rate(management_store_global_lock_acquisition_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"expr": "histogram_quantile(0.90, sum(rate(management_store_global_lock_acquisition_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"hide": false,
|
||||
"instant": false,
|
||||
"legendFormat": "p90",
|
||||
@@ -1110,7 +1110,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99, sum(rate(management_store_global_lock_acquisition_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"expr": "histogram_quantile(0.99, sum(rate(management_store_global_lock_acquisition_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
|
||||
"hide": false,
|
||||
"instant": false,
|
||||
"legendFormat": "p99",
|
||||
@@ -1221,7 +1221,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "rate(management_idp_authenticate_request_counter_ratio_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
|
||||
"expr": "rate(management_idp_authenticate_request_counter_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
|
||||
"instant": false,
|
||||
"legendFormat": "{{cluster}}/{{environment}}/{{job}}",
|
||||
"range": true,
|
||||
@@ -1317,7 +1317,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "rate(management_idp_get_account_counter_ratio_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
|
||||
"expr": "rate(management_idp_get_account_counter_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
|
||||
"instant": false,
|
||||
"legendFormat": "{{cluster}}/{{environment}}/{{job}}",
|
||||
"range": true,
|
||||
@@ -1413,7 +1413,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "rate(management_idp_update_user_meta_counter_ratio_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
|
||||
"expr": "rate(management_idp_update_user_meta_counter_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
|
||||
"instant": false,
|
||||
"legendFormat": "{{cluster}}/{{environment}}/{{job}}",
|
||||
"range": true,
|
||||
@@ -1523,7 +1523,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(rate(management_http_request_counter_ratio_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",method=~\"GET|OPTIONS\"}[$__rate_interval])) by (job,method)",
|
||||
"expr": "sum(rate(management_http_request_counter_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",method=~\"GET|OPTIONS\"}[$__rate_interval])) by (job,method)",
|
||||
"instant": false,
|
||||
"legendFormat": "{{method}}",
|
||||
"range": true,
|
||||
@@ -1619,7 +1619,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(rate(management_http_request_counter_ratio_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",method=~\"POST|PUT|DELETE\"}[$__rate_interval])) by (job,method)",
|
||||
"expr": "sum(rate(management_http_request_counter_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",method=~\"POST|PUT|DELETE\"}[$__rate_interval])) by (job,method)",
|
||||
"instant": false,
|
||||
"legendFormat": "{{method}}",
|
||||
"range": true,
|
||||
@@ -1715,7 +1715,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.50, sum(rate(management_http_request_duration_ms_total_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"read\"}[5m])) by (le))",
|
||||
"expr": "histogram_quantile(0.50, sum(rate(management_http_request_duration_ms_total_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"read\"}[5m])) by (le))",
|
||||
"instant": false,
|
||||
"legendFormat": "p50",
|
||||
"range": true,
|
||||
@@ -1727,7 +1727,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.90, sum(rate(management_http_request_duration_ms_total_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"read\"}[5m])) by (le))",
|
||||
"expr": "histogram_quantile(0.90, sum(rate(management_http_request_duration_ms_total_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"read\"}[5m])) by (le))",
|
||||
"hide": false,
|
||||
"instant": false,
|
||||
"legendFormat": "p90",
|
||||
@@ -1740,7 +1740,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99, sum(rate(management_http_request_duration_ms_total_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"read\"}[5m])) by (le))",
|
||||
"expr": "histogram_quantile(0.99, sum(rate(management_http_request_duration_ms_total_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"read\"}[5m])) by (le))",
|
||||
"hide": false,
|
||||
"instant": false,
|
||||
"legendFormat": "p99",
|
||||
@@ -1837,7 +1837,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.50, sum(rate(management_http_request_duration_ms_total_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"write\"}[5m])) by (le))",
|
||||
"expr": "histogram_quantile(0.50, sum(rate(management_http_request_duration_ms_total_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"write\"}[5m])) by (le))",
|
||||
"instant": false,
|
||||
"legendFormat": "p50",
|
||||
"range": true,
|
||||
@@ -1849,7 +1849,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.90, sum(rate(management_http_request_duration_ms_total_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"write\"}[5m])) by (le))",
|
||||
"expr": "histogram_quantile(0.90, sum(rate(management_http_request_duration_ms_total_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"write\"}[5m])) by (le))",
|
||||
"hide": false,
|
||||
"instant": false,
|
||||
"legendFormat": "p90",
|
||||
@@ -1862,7 +1862,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "histogram_quantile(0.99, sum(rate(management_http_request_duration_ms_total_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"write\"}[5m])) by (le))",
|
||||
"expr": "histogram_quantile(0.99, sum(rate(management_http_request_duration_ms_total_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"write\"}[5m])) by (le))",
|
||||
"hide": false,
|
||||
"instant": false,
|
||||
"legendFormat": "p99",
|
||||
@@ -1963,7 +1963,7 @@
|
||||
"uid": "${datasource}"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(rate(management_http_request_counter_ratio_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (job,exported_endpoint,method)",
|
||||
"expr": "sum(rate(management_http_request_counter_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (job,exported_endpoint,method)",
|
||||
"hide": false,
|
||||
"instant": false,
|
||||
"legendFormat": "{{method}}-{{exported_endpoint}}",
|
||||
@@ -3222,7 +3222,7 @@
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "builder",
|
||||
"expr": "sum by(le) (increase(management_grpc_updatechannel_queue_bucket{application=\"management\", environment=\"$environment\", host=~\"$host\"}[$__rate_interval]))",
|
||||
"expr": "sum by(le) (increase(management_grpc_updatechannel_queue_length_bucket{application=\"management\", environment=\"$environment\", host=~\"$host\"}[$__rate_interval]))",
|
||||
"format": "heatmap",
|
||||
"fullMetaSearch": false,
|
||||
"includeNullMetadata": true,
|
||||
@@ -3323,7 +3323,7 @@
|
||||
},
|
||||
"disableTextWrap": false,
|
||||
"editorMode": "builder",
|
||||
"expr": "sum by(le) (increase(management_account_update_account_peers_duration_ms_bucket{application=\"management\", environment=\"$environment\", host=~\"$host\"}[$__rate_interval]))",
|
||||
"expr": "sum by(le) (increase(management_account_update_account_peers_duration_ms_milliseconds_bucket{application=\"management\", environment=\"$environment\", host=~\"$host\"}[$__rate_interval]))",
|
||||
"format": "heatmap",
|
||||
"fullMetaSearch": false,
|
||||
"includeNullMetadata": true,
|
||||
|
||||
@@ -106,13 +106,23 @@ func (m *managerImpl) CleanupOldAccessLogs(ctx context.Context, retentionDays in
|
||||
|
||||
// StartPeriodicCleanup starts a background goroutine that periodically cleans up old access logs
|
||||
func (m *managerImpl) StartPeriodicCleanup(ctx context.Context, retentionDays, cleanupIntervalHours int) {
|
||||
if retentionDays <= 0 {
|
||||
log.WithContext(ctx).Debug("periodic access log cleanup disabled: retention days is 0 or negative")
|
||||
if retentionDays < 0 {
|
||||
log.WithContext(ctx).Debug("periodic access log cleanup disabled: retention days is negative")
|
||||
return
|
||||
}
|
||||
|
||||
if retentionDays == 0 {
|
||||
retentionDays = 7
|
||||
log.WithContext(ctx).Debugf("no retention days specified for access log cleanup, defaulting to %d days", retentionDays)
|
||||
} else {
|
||||
log.WithContext(ctx).Debugf("access log retention period set to %d days", retentionDays)
|
||||
}
|
||||
|
||||
if cleanupIntervalHours <= 0 {
|
||||
cleanupIntervalHours = 24
|
||||
log.WithContext(ctx).Debugf("no cleanup interval specified for access log cleanup, defaulting to %d hours", cleanupIntervalHours)
|
||||
} else {
|
||||
log.WithContext(ctx).Debugf("access log cleanup interval set to %d hours", cleanupIntervalHours)
|
||||
}
|
||||
|
||||
cleanupCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
@@ -121,7 +121,7 @@ func TestCleanupWithExactBoundary(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStartPeriodicCleanup(t *testing.T) {
|
||||
t.Run("periodic cleanup disabled with zero retention", func(t *testing.T) {
|
||||
t.Run("periodic cleanup disabled with negative retention", func(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
@@ -135,7 +135,7 @@ func TestStartPeriodicCleanup(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
manager.StartPeriodicCleanup(ctx, 0, 1)
|
||||
manager.StartPeriodicCleanup(ctx, -1, 1)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
|
||||
@@ -30,3 +30,8 @@ func (d *Domain) EventMeta() map[string]any {
|
||||
"validated": d.Validated,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Domain) Copy() *Domain {
|
||||
dCopy := *d
|
||||
return &dCopy
|
||||
}
|
||||
|
||||
@@ -203,7 +203,7 @@ type ReverseProxy struct {
|
||||
|
||||
// AccessLogRetentionDays specifies the number of days to retain access logs.
|
||||
// Logs older than this duration will be automatically deleted during cleanup.
|
||||
// A value of 0 or negative means logs are kept indefinitely (no cleanup).
|
||||
// A value of 0 will default to 7 days. Negative means logs are kept indefinitely (no cleanup).
|
||||
AccessLogRetentionDays int
|
||||
|
||||
// AccessLogCleanupIntervalHours specifies how often (in hours) to run the cleanup routine.
|
||||
|
||||
@@ -742,11 +742,6 @@ func (am *DefaultAccountManager) DeleteAccount(ctx context.Context, accountID, u
|
||||
return status.Errorf(status.Internal, "failed to build user infos for account %s: %v", accountID, err)
|
||||
}
|
||||
|
||||
err = am.serviceManager.DeleteAllServices(ctx, accountID, userID)
|
||||
if err != nil {
|
||||
return status.Errorf(status.Internal, "failed to delete service %s: %v", accountID, err)
|
||||
}
|
||||
|
||||
for _, otherUser := range account.Users {
|
||||
if otherUser.Id == userID {
|
||||
continue
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/netbirdio/netbird/shared/management/status"
|
||||
"github.com/prometheus/client_golang/prometheus/push"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -23,6 +22,9 @@ import (
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
|
||||
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy/domain"
|
||||
"github.com/netbirdio/netbird/shared/management/status"
|
||||
|
||||
nbdns "github.com/netbirdio/netbird/dns"
|
||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
|
||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/controller"
|
||||
@@ -1815,6 +1817,13 @@ func TestAccount_Copy(t *testing.T) {
|
||||
Targets: []*service.Target{},
|
||||
},
|
||||
},
|
||||
Domains: []*domain.Domain{
|
||||
{
|
||||
ID: "domain1",
|
||||
Domain: "test.com",
|
||||
AccountID: "account1",
|
||||
},
|
||||
},
|
||||
NetworkMapCache: &types.NetworkMapBuilder{},
|
||||
}
|
||||
account.InitOnce()
|
||||
|
||||
@@ -489,6 +489,102 @@ func MigrateJsonToTable[T any](ctx context.Context, db *gorm.DB, columnName stri
|
||||
return nil
|
||||
}
|
||||
|
||||
// hasForeignKey checks whether a foreign key constraint exists on the given table and column.
|
||||
func hasForeignKey(db *gorm.DB, table, column string) bool {
|
||||
var count int64
|
||||
|
||||
switch db.Name() {
|
||||
case "postgres":
|
||||
db.Raw(`
|
||||
SELECT COUNT(*) FROM information_schema.key_column_usage kcu
|
||||
JOIN information_schema.table_constraints tc
|
||||
ON tc.constraint_name = kcu.constraint_name
|
||||
AND tc.table_schema = kcu.table_schema
|
||||
WHERE tc.constraint_type = 'FOREIGN KEY'
|
||||
AND kcu.table_name = ?
|
||||
AND kcu.column_name = ?
|
||||
`, table, column).Scan(&count)
|
||||
case "mysql":
|
||||
db.Raw(`
|
||||
SELECT COUNT(*) FROM information_schema.key_column_usage
|
||||
WHERE table_schema = DATABASE()
|
||||
AND table_name = ?
|
||||
AND column_name = ?
|
||||
AND referenced_table_name IS NOT NULL
|
||||
`, table, column).Scan(&count)
|
||||
default: // sqlite
|
||||
type fkInfo struct {
|
||||
From string
|
||||
}
|
||||
var fks []fkInfo
|
||||
db.Raw(fmt.Sprintf("PRAGMA foreign_key_list(%s)", table)).Scan(&fks)
|
||||
for _, fk := range fks {
|
||||
if fk.From == column {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
return count > 0
|
||||
}
|
||||
|
||||
// CleanupOrphanedResources deletes rows from the table of model T where the foreign
|
||||
// key column (fkColumn) references a row in the table of model R that no longer exists.
|
||||
func CleanupOrphanedResources[T any, R any](ctx context.Context, db *gorm.DB, fkColumn string) error {
|
||||
var model T
|
||||
var refModel R
|
||||
|
||||
if !db.Migrator().HasTable(&model) {
|
||||
log.WithContext(ctx).Debugf("table for %T does not exist, no cleanup needed", model)
|
||||
return nil
|
||||
}
|
||||
|
||||
if !db.Migrator().HasTable(&refModel) {
|
||||
log.WithContext(ctx).Debugf("referenced table for %T does not exist, no cleanup needed", refModel)
|
||||
return nil
|
||||
}
|
||||
|
||||
stmtT := &gorm.Statement{DB: db}
|
||||
if err := stmtT.Parse(&model); err != nil {
|
||||
return fmt.Errorf("parse model %T: %w", model, err)
|
||||
}
|
||||
childTable := stmtT.Schema.Table
|
||||
|
||||
stmtR := &gorm.Statement{DB: db}
|
||||
if err := stmtR.Parse(&refModel); err != nil {
|
||||
return fmt.Errorf("parse reference model %T: %w", refModel, err)
|
||||
}
|
||||
parentTable := stmtR.Schema.Table
|
||||
|
||||
if !db.Migrator().HasColumn(&model, fkColumn) {
|
||||
log.WithContext(ctx).Debugf("column %s does not exist in table %s, no cleanup needed", fkColumn, childTable)
|
||||
return nil
|
||||
}
|
||||
|
||||
// If a foreign key constraint already exists on the column, the DB itself
|
||||
// enforces referential integrity and orphaned rows cannot exist.
|
||||
if hasForeignKey(db, childTable, fkColumn) {
|
||||
log.WithContext(ctx).Debugf("foreign key constraint for %s already exists on %s, no cleanup needed", fkColumn, childTable)
|
||||
return nil
|
||||
}
|
||||
|
||||
result := db.Exec(
|
||||
fmt.Sprintf(
|
||||
"DELETE FROM %s WHERE %s NOT IN (SELECT id FROM %s)",
|
||||
childTable, fkColumn, parentTable,
|
||||
),
|
||||
)
|
||||
if result.Error != nil {
|
||||
return fmt.Errorf("cleanup orphaned rows in %s: %w", childTable, result.Error)
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Infof("Cleaned up %d orphaned rows from %s where %s had no matching row in %s",
|
||||
result.RowsAffected, childTable, fkColumn, parentTable)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func RemoveDuplicatePeerKeys(ctx context.Context, db *gorm.DB) error {
|
||||
if !db.Migrator().HasTable("peers") {
|
||||
log.WithContext(ctx).Debug("peers table does not exist, skipping duplicate key cleanup")
|
||||
|
||||
@@ -441,3 +441,197 @@ func TestRemoveDuplicatePeerKeys_NoTable(t *testing.T) {
|
||||
err := migration.RemoveDuplicatePeerKeys(context.Background(), db)
|
||||
require.NoError(t, err, "Should not fail when table does not exist")
|
||||
}
|
||||
|
||||
type testParent struct {
|
||||
ID string `gorm:"primaryKey"`
|
||||
}
|
||||
|
||||
func (testParent) TableName() string {
|
||||
return "test_parents"
|
||||
}
|
||||
|
||||
type testChild struct {
|
||||
ID string `gorm:"primaryKey"`
|
||||
ParentID string
|
||||
}
|
||||
|
||||
func (testChild) TableName() string {
|
||||
return "test_children"
|
||||
}
|
||||
|
||||
type testChildWithFK struct {
|
||||
ID string `gorm:"primaryKey"`
|
||||
ParentID string `gorm:"index"`
|
||||
Parent *testParent `gorm:"foreignKey:ParentID"`
|
||||
}
|
||||
|
||||
func (testChildWithFK) TableName() string {
|
||||
return "test_children"
|
||||
}
|
||||
|
||||
func setupOrphanTestDB(t *testing.T, models ...any) *gorm.DB {
|
||||
t.Helper()
|
||||
db := setupDatabase(t)
|
||||
for _, m := range models {
|
||||
_ = db.Migrator().DropTable(m)
|
||||
}
|
||||
err := db.AutoMigrate(models...)
|
||||
require.NoError(t, err, "Failed to auto-migrate tables")
|
||||
return db
|
||||
}
|
||||
|
||||
func TestCleanupOrphanedResources_NoChildTable(t *testing.T) {
|
||||
db := setupDatabase(t)
|
||||
_ = db.Migrator().DropTable(&testChild{})
|
||||
_ = db.Migrator().DropTable(&testParent{})
|
||||
|
||||
err := migration.CleanupOrphanedResources[testChild, testParent](context.Background(), db, "parent_id")
|
||||
require.NoError(t, err, "Should not fail when child table does not exist")
|
||||
}
|
||||
|
||||
func TestCleanupOrphanedResources_NoParentTable(t *testing.T) {
|
||||
db := setupDatabase(t)
|
||||
_ = db.Migrator().DropTable(&testParent{})
|
||||
_ = db.Migrator().DropTable(&testChild{})
|
||||
|
||||
err := db.AutoMigrate(&testChild{})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = migration.CleanupOrphanedResources[testChild, testParent](context.Background(), db, "parent_id")
|
||||
require.NoError(t, err, "Should not fail when parent table does not exist")
|
||||
}
|
||||
|
||||
func TestCleanupOrphanedResources_EmptyTables(t *testing.T) {
|
||||
db := setupOrphanTestDB(t, &testParent{}, &testChild{})
|
||||
|
||||
err := migration.CleanupOrphanedResources[testChild, testParent](context.Background(), db, "parent_id")
|
||||
require.NoError(t, err, "Should not fail on empty tables")
|
||||
|
||||
var count int64
|
||||
db.Model(&testChild{}).Count(&count)
|
||||
assert.Equal(t, int64(0), count)
|
||||
}
|
||||
|
||||
func TestCleanupOrphanedResources_NoOrphans(t *testing.T) {
|
||||
db := setupOrphanTestDB(t, &testParent{}, &testChild{})
|
||||
|
||||
require.NoError(t, db.Create(&testParent{ID: "p1"}).Error)
|
||||
require.NoError(t, db.Create(&testParent{ID: "p2"}).Error)
|
||||
require.NoError(t, db.Create(&testChild{ID: "c1", ParentID: "p1"}).Error)
|
||||
require.NoError(t, db.Create(&testChild{ID: "c2", ParentID: "p2"}).Error)
|
||||
|
||||
err := migration.CleanupOrphanedResources[testChild, testParent](context.Background(), db, "parent_id")
|
||||
require.NoError(t, err)
|
||||
|
||||
var count int64
|
||||
db.Model(&testChild{}).Count(&count)
|
||||
assert.Equal(t, int64(2), count, "All children should remain when no orphans")
|
||||
}
|
||||
|
||||
func TestCleanupOrphanedResources_AllOrphans(t *testing.T) {
|
||||
db := setupOrphanTestDB(t, &testParent{}, &testChild{})
|
||||
|
||||
require.NoError(t, db.Exec("INSERT INTO test_children (id, parent_id) VALUES (?, ?)", "c1", "gone1").Error)
|
||||
require.NoError(t, db.Exec("INSERT INTO test_children (id, parent_id) VALUES (?, ?)", "c2", "gone2").Error)
|
||||
require.NoError(t, db.Exec("INSERT INTO test_children (id, parent_id) VALUES (?, ?)", "c3", "gone3").Error)
|
||||
|
||||
err := migration.CleanupOrphanedResources[testChild, testParent](context.Background(), db, "parent_id")
|
||||
require.NoError(t, err)
|
||||
|
||||
var count int64
|
||||
db.Model(&testChild{}).Count(&count)
|
||||
assert.Equal(t, int64(0), count, "All orphaned children should be deleted")
|
||||
}
|
||||
|
||||
func TestCleanupOrphanedResources_MixedValidAndOrphaned(t *testing.T) {
|
||||
db := setupOrphanTestDB(t, &testParent{}, &testChild{})
|
||||
|
||||
require.NoError(t, db.Create(&testParent{ID: "p1"}).Error)
|
||||
require.NoError(t, db.Create(&testParent{ID: "p2"}).Error)
|
||||
|
||||
require.NoError(t, db.Create(&testChild{ID: "c1", ParentID: "p1"}).Error)
|
||||
require.NoError(t, db.Create(&testChild{ID: "c2", ParentID: "p2"}).Error)
|
||||
require.NoError(t, db.Create(&testChild{ID: "c3", ParentID: "p1"}).Error)
|
||||
|
||||
require.NoError(t, db.Exec("INSERT INTO test_children (id, parent_id) VALUES (?, ?)", "c4", "gone1").Error)
|
||||
require.NoError(t, db.Exec("INSERT INTO test_children (id, parent_id) VALUES (?, ?)", "c5", "gone2").Error)
|
||||
|
||||
err := migration.CleanupOrphanedResources[testChild, testParent](context.Background(), db, "parent_id")
|
||||
require.NoError(t, err)
|
||||
|
||||
var remaining []testChild
|
||||
require.NoError(t, db.Order("id").Find(&remaining).Error)
|
||||
|
||||
assert.Len(t, remaining, 3, "Only valid children should remain")
|
||||
assert.Equal(t, "c1", remaining[0].ID)
|
||||
assert.Equal(t, "c2", remaining[1].ID)
|
||||
assert.Equal(t, "c3", remaining[2].ID)
|
||||
}
|
||||
|
||||
func TestCleanupOrphanedResources_Idempotent(t *testing.T) {
|
||||
db := setupOrphanTestDB(t, &testParent{}, &testChild{})
|
||||
|
||||
require.NoError(t, db.Create(&testParent{ID: "p1"}).Error)
|
||||
require.NoError(t, db.Create(&testChild{ID: "c1", ParentID: "p1"}).Error)
|
||||
require.NoError(t, db.Exec("INSERT INTO test_children (id, parent_id) VALUES (?, ?)", "c2", "gone").Error)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
err := migration.CleanupOrphanedResources[testChild, testParent](ctx, db, "parent_id")
|
||||
require.NoError(t, err)
|
||||
|
||||
var count int64
|
||||
db.Model(&testChild{}).Count(&count)
|
||||
assert.Equal(t, int64(1), count)
|
||||
|
||||
err = migration.CleanupOrphanedResources[testChild, testParent](ctx, db, "parent_id")
|
||||
require.NoError(t, err)
|
||||
|
||||
db.Model(&testChild{}).Count(&count)
|
||||
assert.Equal(t, int64(1), count, "Count should remain the same after second run")
|
||||
}
|
||||
|
||||
func TestCleanupOrphanedResources_SkipsWhenForeignKeyExists(t *testing.T) {
|
||||
engine := os.Getenv("NETBIRD_STORE_ENGINE")
|
||||
if engine != "postgres" && engine != "mysql" {
|
||||
t.Skip("FK constraint early-exit test requires postgres or mysql")
|
||||
}
|
||||
|
||||
db := setupDatabase(t)
|
||||
_ = db.Migrator().DropTable(&testChildWithFK{})
|
||||
_ = db.Migrator().DropTable(&testParent{})
|
||||
|
||||
err := db.AutoMigrate(&testParent{}, &testChildWithFK{})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, db.Create(&testParent{ID: "p1"}).Error)
|
||||
require.NoError(t, db.Create(&testParent{ID: "p2"}).Error)
|
||||
require.NoError(t, db.Create(&testChildWithFK{ID: "c1", ParentID: "p1"}).Error)
|
||||
require.NoError(t, db.Create(&testChildWithFK{ID: "c2", ParentID: "p2"}).Error)
|
||||
|
||||
switch engine {
|
||||
case "postgres":
|
||||
require.NoError(t, db.Exec("ALTER TABLE test_children DROP CONSTRAINT fk_test_children_parent").Error)
|
||||
require.NoError(t, db.Exec("DELETE FROM test_parents WHERE id = ?", "p2").Error)
|
||||
require.NoError(t, db.Exec(
|
||||
"ALTER TABLE test_children ADD CONSTRAINT fk_test_children_parent "+
|
||||
"FOREIGN KEY (parent_id) REFERENCES test_parents(id) NOT VALID",
|
||||
).Error)
|
||||
case "mysql":
|
||||
require.NoError(t, db.Exec("SET FOREIGN_KEY_CHECKS = 0").Error)
|
||||
require.NoError(t, db.Exec("ALTER TABLE test_children DROP FOREIGN KEY fk_test_children_parent").Error)
|
||||
require.NoError(t, db.Exec("DELETE FROM test_parents WHERE id = ?", "p2").Error)
|
||||
require.NoError(t, db.Exec(
|
||||
"ALTER TABLE test_children ADD CONSTRAINT fk_test_children_parent "+
|
||||
"FOREIGN KEY (parent_id) REFERENCES test_parents(id)",
|
||||
).Error)
|
||||
require.NoError(t, db.Exec("SET FOREIGN_KEY_CHECKS = 1").Error)
|
||||
}
|
||||
|
||||
err = migration.CleanupOrphanedResources[testChildWithFK, testParent](context.Background(), db, "parent_id")
|
||||
require.NoError(t, err)
|
||||
|
||||
var count int64
|
||||
db.Model(&testChildWithFK{}).Count(&count)
|
||||
assert.Equal(t, int64(2), count, "Both rows should survive — migration must skip when FK constraint exists")
|
||||
}
|
||||
|
||||
@@ -396,6 +396,11 @@ func (s *SqlStore) DeleteAccount(ctx context.Context, account *types.Account) er
|
||||
return result.Error
|
||||
}
|
||||
|
||||
result = tx.Select(clause.Associations).Delete(account.Services, "account_id = ?", account.Id)
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
}
|
||||
|
||||
result = tx.Select(clause.Associations).Delete(account)
|
||||
if result.Error != nil {
|
||||
return result.Error
|
||||
@@ -2099,7 +2104,8 @@ func (s *SqlStore) getServices(ctx context.Context, accountID string) ([]*rpserv
|
||||
var createdAt, certIssuedAt sql.NullTime
|
||||
var status, proxyCluster, sessionPrivateKey, sessionPublicKey sql.NullString
|
||||
var mode, source, sourcePeer sql.NullString
|
||||
var terminated sql.NullBool
|
||||
var terminated, portAutoAssigned sql.NullBool
|
||||
var listenPort sql.NullInt64
|
||||
err := row.Scan(
|
||||
&s.ID,
|
||||
&s.AccountID,
|
||||
@@ -2116,8 +2122,8 @@ func (s *SqlStore) getServices(ctx context.Context, accountID string) ([]*rpserv
|
||||
&sessionPrivateKey,
|
||||
&sessionPublicKey,
|
||||
&mode,
|
||||
&s.ListenPort,
|
||||
&s.PortAutoAssigned,
|
||||
&listenPort,
|
||||
&portAutoAssigned,
|
||||
&source,
|
||||
&sourcePeer,
|
||||
&terminated,
|
||||
@@ -2164,6 +2170,12 @@ func (s *SqlStore) getServices(ctx context.Context, accountID string) ([]*rpserv
|
||||
if terminated.Valid {
|
||||
s.Terminated = terminated.Bool
|
||||
}
|
||||
if portAutoAssigned.Valid {
|
||||
s.PortAutoAssigned = portAutoAssigned.Bool
|
||||
}
|
||||
if listenPort.Valid {
|
||||
s.ListenPort = uint16(listenPort.Int64)
|
||||
}
|
||||
s.Targets = []*rpservice.Target{}
|
||||
return &s, nil
|
||||
})
|
||||
|
||||
@@ -22,6 +22,8 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
nbdns "github.com/netbirdio/netbird/dns"
|
||||
proxydomain "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/domain"
|
||||
rpservice "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/service"
|
||||
"github.com/netbirdio/netbird/management/internals/modules/zones"
|
||||
"github.com/netbirdio/netbird/management/internals/modules/zones/records"
|
||||
resourceTypes "github.com/netbirdio/netbird/management/server/networks/resources/types"
|
||||
@@ -350,6 +352,35 @@ func TestSqlite_DeleteAccount(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
account.Services = []*rpservice.Service{
|
||||
{
|
||||
ID: "service_id",
|
||||
AccountID: account.Id,
|
||||
Name: "test service",
|
||||
Domain: "svc.example.com",
|
||||
Enabled: true,
|
||||
Targets: []*rpservice.Target{
|
||||
{
|
||||
AccountID: account.Id,
|
||||
ServiceID: "service_id",
|
||||
Host: "localhost",
|
||||
Port: 8080,
|
||||
Protocol: "http",
|
||||
Enabled: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
account.Domains = []*proxydomain.Domain{
|
||||
{
|
||||
ID: "domain_id",
|
||||
Domain: "custom.example.com",
|
||||
AccountID: account.Id,
|
||||
Validated: true,
|
||||
},
|
||||
}
|
||||
|
||||
err = store.SaveAccount(context.Background(), account)
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -411,6 +442,20 @@ func TestSqlite_DeleteAccount(t *testing.T) {
|
||||
require.NoError(t, err, "expecting no error after removing DeleteAccount when searching for network resources")
|
||||
require.Len(t, resources, 0, "expecting no network resources to be found after DeleteAccount")
|
||||
}
|
||||
|
||||
domains, err := store.ListCustomDomains(context.Background(), account.Id)
|
||||
require.NoError(t, err, "expecting no error after DeleteAccount when searching for custom domains")
|
||||
require.Len(t, domains, 0, "expecting no custom domains to be found after DeleteAccount")
|
||||
|
||||
var services []*rpservice.Service
|
||||
err = store.(*SqlStore).db.Model(&rpservice.Service{}).Find(&services, "account_id = ?", account.Id).Error
|
||||
require.NoError(t, err, "expecting no error after DeleteAccount when searching for services")
|
||||
require.Len(t, services, 0, "expecting no services to be found after DeleteAccount")
|
||||
|
||||
var targets []*rpservice.Target
|
||||
err = store.(*SqlStore).db.Model(&rpservice.Target{}).Find(&targets, "account_id = ?", account.Id).Error
|
||||
require.NoError(t, err, "expecting no error after DeleteAccount when searching for service targets")
|
||||
require.Len(t, targets, 0, "expecting no service targets to be found after DeleteAccount")
|
||||
}
|
||||
|
||||
func Test_GetAccount(t *testing.T) {
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
nbdns "github.com/netbirdio/netbird/dns"
|
||||
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy/domain"
|
||||
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy/service"
|
||||
resourceTypes "github.com/netbirdio/netbird/management/server/networks/resources/types"
|
||||
routerTypes "github.com/netbirdio/netbird/management/server/networks/routers/types"
|
||||
@@ -265,6 +266,7 @@ func setupBenchmarkDB(b testing.TB) (*SqlStore, func(), string) {
|
||||
&nbdns.NameServerGroup{}, &posture.Checks{}, &networkTypes.Network{},
|
||||
&routerTypes.NetworkRouter{}, &resourceTypes.NetworkResource{},
|
||||
&types.AccountOnboarding{}, &service.Service{}, &service.Target{},
|
||||
&domain.Domain{},
|
||||
}
|
||||
|
||||
for i := len(models) - 1; i >= 0; i-- {
|
||||
|
||||
@@ -448,6 +448,12 @@ func getMigrationsPreAuto(ctx context.Context) []migrationFunc {
|
||||
func(db *gorm.DB) error {
|
||||
return migration.RemoveDuplicatePeerKeys(ctx, db)
|
||||
},
|
||||
func(db *gorm.DB) error {
|
||||
return migration.CleanupOrphanedResources[rpservice.Service, types.Account](ctx, db, "account_id")
|
||||
},
|
||||
func(db *gorm.DB) error {
|
||||
return migration.CleanupOrphanedResources[domain.Domain, types.Account](ctx, db, "account_id")
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
|
||||
"github.com/netbirdio/netbird/client/ssh/auth"
|
||||
nbdns "github.com/netbirdio/netbird/dns"
|
||||
proxydomain "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/domain"
|
||||
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy/service"
|
||||
"github.com/netbirdio/netbird/management/internals/modules/zones"
|
||||
"github.com/netbirdio/netbird/management/internals/modules/zones/records"
|
||||
@@ -101,6 +102,7 @@ type Account struct {
|
||||
DNSSettings DNSSettings `gorm:"embedded;embeddedPrefix:dns_settings_"`
|
||||
PostureChecks []*posture.Checks `gorm:"foreignKey:AccountID;references:id"`
|
||||
Services []*service.Service `gorm:"foreignKey:AccountID;references:id"`
|
||||
Domains []*proxydomain.Domain `gorm:"foreignKey:AccountID;references:id"`
|
||||
// Settings is a dictionary of Account settings
|
||||
Settings *Settings `gorm:"embedded;embeddedPrefix:settings_"`
|
||||
Networks []*networkTypes.Network `gorm:"foreignKey:AccountID;references:id"`
|
||||
@@ -911,6 +913,11 @@ func (a *Account) Copy() *Account {
|
||||
services = append(services, svc.Copy())
|
||||
}
|
||||
|
||||
domains := []*proxydomain.Domain{}
|
||||
for _, domain := range a.Domains {
|
||||
domains = append(domains, domain.Copy())
|
||||
}
|
||||
|
||||
return &Account{
|
||||
Id: a.Id,
|
||||
CreatedBy: a.CreatedBy,
|
||||
@@ -936,6 +943,7 @@ func (a *Account) Copy() *Account {
|
||||
Onboarding: a.Onboarding,
|
||||
NetworkMapCache: a.NetworkMapCache,
|
||||
nmapInitOnce: a.nmapInitOnce,
|
||||
Domains: domains,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4751,6 +4751,7 @@ components:
|
||||
enum:
|
||||
- email
|
||||
- webhook
|
||||
- slack
|
||||
example: "email"
|
||||
NotificationEventType:
|
||||
type: string
|
||||
@@ -4804,6 +4805,7 @@ components:
|
||||
Channel-specific target configuration. The shape depends on the `type` field:
|
||||
- `email`: requires an `EmailTarget` object
|
||||
- `webhook`: requires a `WebhookTarget` object
|
||||
- `slack`: requires a `WebhookTarget` object
|
||||
oneOf:
|
||||
- $ref: '#/components/schemas/EmailTarget'
|
||||
- $ref: '#/components/schemas/WebhookTarget'
|
||||
@@ -4837,6 +4839,7 @@ components:
|
||||
Channel-specific target configuration. The shape depends on the `type` field:
|
||||
- `email`: an `EmailTarget` object
|
||||
- `webhook`: a `WebhookTarget` object
|
||||
- `slack`: a `WebhookTarget` object
|
||||
oneOf:
|
||||
- $ref: '#/components/schemas/EmailTarget'
|
||||
- $ref: '#/components/schemas/WebhookTarget'
|
||||
|
||||
@@ -686,6 +686,7 @@ func (e NetworkResourceType) Valid() bool {
|
||||
// Defines values for NotificationChannelType.
|
||||
const (
|
||||
NotificationChannelTypeEmail NotificationChannelType = "email"
|
||||
NotificationChannelTypeSlack NotificationChannelType = "slack"
|
||||
NotificationChannelTypeWebhook NotificationChannelType = "webhook"
|
||||
)
|
||||
|
||||
@@ -694,6 +695,8 @@ func (e NotificationChannelType) Valid() bool {
|
||||
switch e {
|
||||
case NotificationChannelTypeEmail:
|
||||
return true
|
||||
case NotificationChannelTypeSlack:
|
||||
return true
|
||||
case NotificationChannelTypeWebhook:
|
||||
return true
|
||||
default:
|
||||
|
||||
@@ -333,7 +333,7 @@ func (c *Client) connect(ctx context.Context) (*RelayAddr, error) {
|
||||
dialers := c.getDialers()
|
||||
|
||||
rd := dialer.NewRaceDial(c.log, dialer.DefaultConnectionTimeout, c.connectionURL, dialers...)
|
||||
conn, err := rd.Dial()
|
||||
conn, err := rd.Dial(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -40,10 +40,10 @@ func NewRaceDial(log *log.Entry, connectionTimeout time.Duration, serverURL stri
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RaceDial) Dial() (net.Conn, error) {
|
||||
func (r *RaceDial) Dial(ctx context.Context) (net.Conn, error) {
|
||||
connChan := make(chan dialResult, len(r.dialerFns))
|
||||
winnerConn := make(chan net.Conn, 1)
|
||||
abortCtx, abort := context.WithCancel(context.Background())
|
||||
abortCtx, abort := context.WithCancel(ctx)
|
||||
defer abort()
|
||||
|
||||
for _, dfn := range r.dialerFns {
|
||||
|
||||
@@ -78,7 +78,7 @@ func TestRaceDialEmptyDialers(t *testing.T) {
|
||||
serverURL := "test.server.com"
|
||||
|
||||
rd := NewRaceDial(logger, DefaultConnectionTimeout, serverURL)
|
||||
conn, err := rd.Dial()
|
||||
conn, err := rd.Dial(context.Background())
|
||||
if err == nil {
|
||||
t.Errorf("Expected an error with empty dialers, got nil")
|
||||
}
|
||||
@@ -104,7 +104,7 @@ func TestRaceDialSingleSuccessfulDialer(t *testing.T) {
|
||||
}
|
||||
|
||||
rd := NewRaceDial(logger, DefaultConnectionTimeout, serverURL, mockDialer)
|
||||
conn, err := rd.Dial()
|
||||
conn, err := rd.Dial(context.Background())
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error, got %v", err)
|
||||
}
|
||||
@@ -137,7 +137,7 @@ func TestRaceDialMultipleDialersWithOneSuccess(t *testing.T) {
|
||||
}
|
||||
|
||||
rd := NewRaceDial(logger, DefaultConnectionTimeout, serverURL, mockDialer1, mockDialer2)
|
||||
conn, err := rd.Dial()
|
||||
conn, err := rd.Dial(context.Background())
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error, got %v", err)
|
||||
}
|
||||
@@ -160,7 +160,7 @@ func TestRaceDialTimeout(t *testing.T) {
|
||||
}
|
||||
|
||||
rd := NewRaceDial(logger, 3*time.Second, serverURL, mockDialer)
|
||||
conn, err := rd.Dial()
|
||||
conn, err := rd.Dial(context.Background())
|
||||
if err == nil {
|
||||
t.Errorf("Expected an error, got nil")
|
||||
}
|
||||
@@ -188,7 +188,7 @@ func TestRaceDialAllDialersFail(t *testing.T) {
|
||||
}
|
||||
|
||||
rd := NewRaceDial(logger, DefaultConnectionTimeout, serverURL, mockDialer1, mockDialer2)
|
||||
conn, err := rd.Dial()
|
||||
conn, err := rd.Dial(context.Background())
|
||||
if err == nil {
|
||||
t.Errorf("Expected an error, got nil")
|
||||
}
|
||||
@@ -230,7 +230,7 @@ func TestRaceDialFirstSuccessfulDialerWins(t *testing.T) {
|
||||
}
|
||||
|
||||
rd := NewRaceDial(logger, DefaultConnectionTimeout, serverURL, mockDialer1, mockDialer2)
|
||||
conn, err := rd.Dial()
|
||||
conn, err := rd.Dial(context.Background())
|
||||
if err != nil {
|
||||
t.Errorf("Expected no error, got %v", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user