Compare commits

...

12 Commits

Author SHA1 Message Date
bcmmbaga
91a6577182 Add support for Slack as a notification channel type
Signed-off-by: bcmmbaga <bethuelmbaga12@gmail.com>
2026-04-13 17:39:32 +03:00
Zoltan Papp
13539543af [client] Fix/grpc retry (#5750)
* [client] Fix flow client Receive retry loop not stopping after Close

Use backoff.Permanent for canceled gRPC errors so Receive returns
immediately instead of retrying until context deadline when the
connection is already closed. Add TestNewClient_PermanentClose to
verify the behavior.

The connectivity.Shutdown check was meaningless because when the connection is
shut down, c.realClient.Events(ctx, grpc.WaitForReady(true)) on the nex line
already fails with codes.Canceled — which is now handled as a permanent error.
The explicit state check was just duplicating what gRPC already reports
through its normal error path.

* [client] remove WaitForReady from stream open call

grpc.WaitForReady(true) parks the RPC call internally until the
connection reaches READY, only unblocking on ctx cancellation.
This means the external backoff.Retry loop in Receive() never gets
control back during a connection outage — it cannot tick, log, or
apply its retry intervals while WaitForReady is blocking.

Removing it restores fail-fast behaviour: Events() returns immediately
with codes.Unavailable when the connection is not ready, which is
exactly what the backoff loop expects. The backoff becomes the single
authority over retry timing and cadence, as originally intended.

* [client] Add connection recreation and improve flow client error handling

Store gRPC dial options on the client to enable connection recreation
on Internal errors (RST_STREAM/PROTOCOL_ERROR). Treat Unauthenticated,
PermissionDenied, and Unimplemented as permanent failures. Unify mutex
usage and add reconnection logging for better observability.

* [client] Remove Unauthenticated, PermissionDenied, and Unimplemented from permanent error handling

* [client] Fix error handling in Receive to properly re-establish stream and improve reconnection messaging

* Fix test

* [client] Add graceful shutdown handling and test for concurrent Close during Receive

Prevent reconnection attempts after client closure by tracking a `closed` flag. Use `backoff.Permanent` for errors caused by operations on a closed client. Add a test to ensure `Close` does not block when `Receive` is actively running.

* [client] Fix connection swap to properly close old gRPC connection

Close the old `gRPC.ClientConn` after successfully swapping to a new connection during reconnection.

* [client] Reset backoff

* [client] Ensure stream closure on error during initialization

* [client] Add test for handling server-side stream closure and reconnection

Introduce `TestReceive_ServerClosesStream` to verify the client's ability to recover and process acknowledgments after the server closes the stream. Enhance test server with a controlled stream closure mechanism.

* [client] Add protocol error simulation and enhance reconnection test

Introduce `connTrackListener` to simulate HTTP/2 RST_STREAM with PROTOCOL_ERROR for testing. Refactor and rename `TestReceive_ServerClosesStream` to `TestReceive_ProtocolErrorStreamReconnect` to verify client recovery on protocol errors.

* [client] Update Close error message in test for clarity

* [client] Fine-tune the tests

* [client] Adjust connection tracking in reconnection test

* [client] Wait for Events handler to exit in RST_STREAM reconnection test

Ensure the old `Events` handler exits fully before proceeding in the reconnection test to avoid dropped acknowledgments on a broken stream. Add a `handlerDone` channel to synchronize handler exits.

* [client] Prevent panic on nil connection during Close

* [client] Refactor connection handling to use explicit target tracking

Introduce `target` field to store the gRPC connection target directly, simplifying reconnections and ensuring consistent connection reuse logic.

* [client] Rename `isCancellation` to `isContextDone` and extend handling for `DeadlineExceeded`

Refactor error handling to include `DeadlineExceeded` scenarios alongside `Canceled`. Update related condition checks for consistency.

* [client] Add connection generation tracking to prevent stale reconnections

Introduce `connGen` to track connection generations and ensure that stale `recreateConnection` calls do not override newer connections. Update stream establishment and reconnection logic to incorporate generation validation.

* [client] Add backoff reset condition to prevent short-lived retry cycles

Refine backoff reset logic to ensure it only occurs for sufficiently long-lived stream connections, avoiding interference with `MaxElapsedTime`.

* [client] Introduce `minHealthyDuration` to refine backoff reset logic

Add `minHealthyDuration` constant to ensure stream retries only reset the backoff timer if the stream survives beyond a minimum duration. Prevents unhealthy, short-lived streams from interfering with `MaxElapsedTime`.

* [client] IPv6 friendly connection

parsedURL.Hostname() strips IPv6 brackets. For http://[::1]:443, this turns it into ::1:443, which is not a valid host:port target for gRPC. Additionally, fmt.Sprintf("%s:%s", hostname, port) produces a trailing colon when the URL has no explicit port—http://example.com becomes example.com:. Both cases break the initial dial and reconnect paths. Use parsedURL.Host directly instead.

* [client] Add `handlerStarted` channel to synchronize stream establishment in tests

Introduce `handlerStarted` channel in the test server to signal when the server-side handler begins, ensuring robust synchronization between client and server during stream establishment. Update relevant test cases to wait for this signal before proceeding.

* [client] Replace `receivedAcks` map with atomic counter and improve stream establishment sync in tests

Refactor acknowledgment tracking in tests to use an `atomic.Int32` counter instead of a map. Replace fixed sleep with robust synchronization by waiting on `handlerStarted` signal for stream establishment.

* [client] Extract `handleReceiveError` to simplify receive logic

Refactor error handling in `receive` to a dedicated `handleReceiveError` method. Streamlines the main logic and isolates error recovery, including backoff reset and connection recreation.

* [client] recreate gRPC ClientConn on every retry to prevent dual backoff

The flow client had two competing retry loops: our custom exponential
backoff and gRPC's internal subchannel reconnection. When establishStream
failed, the same ClientConn was reused, allowing gRPC's internal backoff
state to accumulate and control dial timing independently.

Changes:
- Consolidate error handling into handleRetryableError, which now
 handles context cancellation, permanent errors, backoff reset,
 and connection recreation in a single path
- Call recreateConnection on every retryable error so each retry
 gets a fresh ClientConn with no internal backoff state
- Remove connGen tracking since Receive is sequential and protected
 by a new receiving guard against concurrent calls
- Reduce RandomizationFactor from 1 to 0.5 to avoid near-zero
 backoff intervals
2026-04-13 10:42:24 +02:00
Zoltan Papp
7483fec048 Fix Android internet blackhole caused by stale route re-injection on TUN rebuild (#5865)
extraInitialRoutes() was meant to preserve only the fake IP route
(240.0.0.0/8) across TUN rebuilds, but it re-injected any initial
route missing from the current set. When the management server
advertised exit node routes (0.0.0.0/0) that were later filtered
by the route selector, extraInitialRoutes() re-added them, causing
the Android VPN to capture all traffic with no peer to handle it.

Store the fake IP route explicitly and append only that in notify(),
removing the overly broad initial route diffing.
2026-04-13 09:38:38 +02:00
Pascal Fischer
5259e5df51 [management] add domain and service cleanup migration (#5850) 2026-04-11 12:00:40 +02:00
Zoltan Papp
ebd78e0122 [client] Update RaceDial to accept context for improved cancellation handling (#5849) 2026-04-10 20:51:04 +02:00
Pascal Fischer
cf86b9a528 [management] enable access log cleanup by default (#5842) 2026-04-10 17:07:27 +02:00
Pascal Fischer
ee588e1536 Revert "[management] allow local routing peer resource (#5814)" (#5847) 2026-04-10 14:53:47 +02:00
Pascal Fischer
2a8aacc5c9 [management] allow local routing peer resource (#5814) 2026-04-10 13:08:21 +02:00
Pascal Fischer
15709bc666 [management] update account delete with proper proxy domain and service cleanup (#5817) 2026-04-10 13:08:04 +02:00
Pascal Fischer
789b4113fe [misc] update dashboards (#5840) 2026-04-10 12:15:58 +02:00
Viktor Liu
d2cdc0efec [client] Use native firewall for peer ACLs in userspace WireGuard mode (#5668) 2026-04-10 09:12:13 +08:00
Pascal Fischer
ee343d5d77 [management] use sql null vars (#5844) 2026-04-09 18:12:38 +02:00
35 changed files with 980 additions and 179 deletions

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"os"
"strconv"
"github.com/coreos/go-iptables/iptables"
"github.com/google/nftables"
@@ -35,20 +36,27 @@ const SKIP_NFTABLES_ENV = "NB_SKIP_NFTABLES_CHECK"
type FWType int
func NewFirewall(iface IFaceMapper, stateManager *statemanager.Manager, flowLogger nftypes.FlowLogger, disableServerRoutes bool, mtu uint16) (firewall.Manager, error) {
// on the linux system we try to user nftables or iptables
// in any case, because we need to allow netbird interface traffic
// so we use AllowNetbird traffic from these firewall managers
// for the userspace packet filtering firewall
// We run in userspace mode and force userspace firewall was requested. We don't attempt native firewall.
if iface.IsUserspaceBind() && forceUserspaceFirewall() {
log.Info("forcing userspace firewall")
return createUserspaceFirewall(iface, nil, disableServerRoutes, flowLogger, mtu)
}
// Use native firewall for either kernel or userspace, the interface appears identical to netfilter
fm, err := createNativeFirewall(iface, stateManager, disableServerRoutes, mtu)
// Kernel cannot fall back to anything else, need to return error
if !iface.IsUserspaceBind() {
return fm, err
}
// Fall back to the userspace packet filter if native is unavailable
if err != nil {
log.Warnf("failed to create native firewall: %v. Proceeding with userspace", err)
return createUserspaceFirewall(iface, nil, disableServerRoutes, flowLogger, mtu)
}
return createUserspaceFirewall(iface, fm, disableServerRoutes, flowLogger, mtu)
return fm, nil
}
func createNativeFirewall(iface IFaceMapper, stateManager *statemanager.Manager, routes bool, mtu uint16) (firewall.Manager, error) {
@@ -160,3 +168,17 @@ func isIptablesClientAvailable(client *iptables.IPTables) bool {
_, err := client.ListChains("filter")
return err == nil
}
func forceUserspaceFirewall() bool {
val := os.Getenv(EnvForceUserspaceFirewall)
if val == "" {
return false
}
force, err := strconv.ParseBool(val)
if err != nil {
log.Warnf("failed to parse %s: %v", EnvForceUserspaceFirewall, err)
return false
}
return force
}

View File

@@ -7,6 +7,12 @@ import (
"github.com/netbirdio/netbird/client/iface/wgaddr"
)
// EnvForceUserspaceFirewall forces the use of the userspace packet filter even when
// native iptables/nftables is available. This only applies when the WireGuard interface
// runs in userspace mode. When set, peer ACLs are handled by USPFilter instead of
// kernel netfilter rules.
const EnvForceUserspaceFirewall = "NB_FORCE_USERSPACE_FIREWALL"
// IFaceMapper defines subset methods of interface required for manager
type IFaceMapper interface {
Name() string

View File

@@ -33,7 +33,6 @@ type Manager struct {
type iFaceMapper interface {
Name() string
Address() wgaddr.Address
IsUserspaceBind() bool
}
// Create iptables firewall manager
@@ -64,10 +63,9 @@ func Create(wgIface iFaceMapper, mtu uint16) (*Manager, error) {
func (m *Manager) Init(stateManager *statemanager.Manager) error {
state := &ShutdownState{
InterfaceState: &InterfaceState{
NameStr: m.wgIface.Name(),
WGAddress: m.wgIface.Address(),
UserspaceBind: m.wgIface.IsUserspaceBind(),
MTU: m.router.mtu,
NameStr: m.wgIface.Name(),
WGAddress: m.wgIface.Address(),
MTU: m.router.mtu,
},
}
stateManager.RegisterState(state)
@@ -203,12 +201,10 @@ func (m *Manager) Close(stateManager *statemanager.Manager) error {
return nberrors.FormatErrorOrNil(merr)
}
// AllowNetbird allows netbird interface traffic
// AllowNetbird allows netbird interface traffic.
// This is called when USPFilter wraps the native firewall, adding blanket accept
// rules so that packet filtering is handled in userspace instead of by netfilter.
func (m *Manager) AllowNetbird() error {
if !m.wgIface.IsUserspaceBind() {
return nil
}
_, err := m.AddPeerFiltering(
nil,
net.IP{0, 0, 0, 0},

View File

@@ -47,8 +47,6 @@ func (i *iFaceMock) Address() wgaddr.Address {
panic("AddressFunc is not set")
}
func (i *iFaceMock) IsUserspaceBind() bool { return false }
func TestIptablesManager(t *testing.T) {
ipv4Client, err := iptables.NewWithProtocol(iptables.ProtocolIPv4)
require.NoError(t, err)

View File

@@ -9,10 +9,9 @@ import (
)
type InterfaceState struct {
NameStr string `json:"name"`
WGAddress wgaddr.Address `json:"wg_address"`
UserspaceBind bool `json:"userspace_bind"`
MTU uint16 `json:"mtu"`
NameStr string `json:"name"`
WGAddress wgaddr.Address `json:"wg_address"`
MTU uint16 `json:"mtu"`
}
func (i *InterfaceState) Name() string {
@@ -23,10 +22,6 @@ func (i *InterfaceState) Address() wgaddr.Address {
return i.WGAddress
}
func (i *InterfaceState) IsUserspaceBind() bool {
return i.UserspaceBind
}
type ShutdownState struct {
sync.Mutex

View File

@@ -40,7 +40,6 @@ func getTableName() string {
type iFaceMapper interface {
Name() string
Address() wgaddr.Address
IsUserspaceBind() bool
}
// Manager of iptables firewall
@@ -106,10 +105,9 @@ func (m *Manager) Init(stateManager *statemanager.Manager) error {
// cleanup using Close() without needing to store specific rules.
if err := stateManager.UpdateState(&ShutdownState{
InterfaceState: &InterfaceState{
NameStr: m.wgIface.Name(),
WGAddress: m.wgIface.Address(),
UserspaceBind: m.wgIface.IsUserspaceBind(),
MTU: m.router.mtu,
NameStr: m.wgIface.Name(),
WGAddress: m.wgIface.Address(),
MTU: m.router.mtu,
},
}); err != nil {
log.Errorf("failed to update state: %v", err)
@@ -205,12 +203,10 @@ func (m *Manager) RemoveNatRule(pair firewall.RouterPair) error {
return m.router.RemoveNatRule(pair)
}
// AllowNetbird allows netbird interface traffic
// AllowNetbird allows netbird interface traffic.
// This is called when USPFilter wraps the native firewall, adding blanket accept
// rules so that packet filtering is handled in userspace instead of by netfilter.
func (m *Manager) AllowNetbird() error {
if !m.wgIface.IsUserspaceBind() {
return nil
}
m.mutex.Lock()
defer m.mutex.Unlock()

View File

@@ -52,8 +52,6 @@ func (i *iFaceMock) Address() wgaddr.Address {
panic("AddressFunc is not set")
}
func (i *iFaceMock) IsUserspaceBind() bool { return false }
func TestNftablesManager(t *testing.T) {
// just check on the local interface

View File

@@ -8,10 +8,9 @@ import (
)
type InterfaceState struct {
NameStr string `json:"name"`
WGAddress wgaddr.Address `json:"wg_address"`
UserspaceBind bool `json:"userspace_bind"`
MTU uint16 `json:"mtu"`
NameStr string `json:"name"`
WGAddress wgaddr.Address `json:"wg_address"`
MTU uint16 `json:"mtu"`
}
func (i *InterfaceState) Name() string {
@@ -22,10 +21,6 @@ func (i *InterfaceState) Address() wgaddr.Address {
return i.WGAddress
}
func (i *InterfaceState) IsUserspaceBind() bool {
return i.UserspaceBind
}
type ShutdownState struct {
InterfaceState *InterfaceState `json:"interface_state,omitempty"`
}

View File

@@ -19,6 +19,9 @@ import (
var flowLogger = netflow.NewManager(nil, []byte{}, nil).GetLogger()
func TestDefaultManager(t *testing.T) {
t.Setenv("NB_WG_KERNEL_DISABLED", "true")
t.Setenv(firewall.EnvForceUserspaceFirewall, "true")
networkMap := &mgmProto.NetworkMap{
FirewallRules: []*mgmProto.FirewallRule{
{
@@ -135,6 +138,7 @@ func TestDefaultManager(t *testing.T) {
func TestDefaultManagerStateless(t *testing.T) {
// stateless currently only in userspace, so we have to disable kernel
t.Setenv("NB_WG_KERNEL_DISABLED", "true")
t.Setenv(firewall.EnvForceUserspaceFirewall, "true")
t.Setenv("NB_DISABLE_CONNTRACK", "true")
networkMap := &mgmProto.NetworkMap{
@@ -194,6 +198,7 @@ func TestDefaultManagerStateless(t *testing.T) {
// This tests the full ACL manager -> uspfilter integration.
func TestDenyRulesNotAccumulatedOnRepeatedApply(t *testing.T) {
t.Setenv("NB_WG_KERNEL_DISABLED", "true")
t.Setenv(firewall.EnvForceUserspaceFirewall, "true")
networkMap := &mgmProto.NetworkMap{
FirewallRules: []*mgmProto.FirewallRule{
@@ -258,6 +263,7 @@ func TestDenyRulesNotAccumulatedOnRepeatedApply(t *testing.T) {
// up when they're removed from the network map in a subsequent update.
func TestDenyRulesCleanedUpOnRemoval(t *testing.T) {
t.Setenv("NB_WG_KERNEL_DISABLED", "true")
t.Setenv(firewall.EnvForceUserspaceFirewall, "true")
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -339,6 +345,7 @@ func TestDenyRulesCleanedUpOnRemoval(t *testing.T) {
// one added without leaking.
func TestRuleUpdateChangingAction(t *testing.T) {
t.Setenv("NB_WG_KERNEL_DISABLED", "true")
t.Setenv(firewall.EnvForceUserspaceFirewall, "true")
ctrl := gomock.NewController(t)
defer ctrl.Finish()

View File

@@ -168,6 +168,7 @@ func (m *DefaultManager) setupAndroidRoutes(config ManagerConfig) {
NetworkType: route.IPv4Network,
}
cr = append(cr, fakeIPRoute)
m.notifier.SetFakeIPRoute(fakeIPRoute)
}
m.notifier.SetInitialClientRoutes(cr, routesForComparison)

View File

@@ -16,6 +16,7 @@ import (
type Notifier struct {
initialRoutes []*route.Route
currentRoutes []*route.Route
fakeIPRoute *route.Route
listener listener.NetworkChangeListener
listenerMux sync.Mutex
@@ -31,13 +32,17 @@ func (n *Notifier) SetListener(listener listener.NetworkChangeListener) {
n.listener = listener
}
// SetInitialClientRoutes stores the full initial route set (including fake IP blocks)
// and a separate comparison set (without fake IP blocks) for diff detection.
// SetInitialClientRoutes stores the initial route sets for TUN configuration.
func (n *Notifier) SetInitialClientRoutes(initialRoutes []*route.Route, routesForComparison []*route.Route) {
n.initialRoutes = filterStatic(initialRoutes)
n.currentRoutes = filterStatic(routesForComparison)
}
// SetFakeIPRoute stores the fake IP route to be included in every TUN rebuild.
func (n *Notifier) SetFakeIPRoute(r *route.Route) {
n.fakeIPRoute = r
}
func (n *Notifier) OnNewRoutes(idMap route.HAMap) {
var newRoutes []*route.Route
for _, routes := range idMap {
@@ -69,7 +74,9 @@ func (n *Notifier) notify() {
}
allRoutes := slices.Clone(n.currentRoutes)
allRoutes = append(allRoutes, n.extraInitialRoutes()...)
if n.fakeIPRoute != nil {
allRoutes = append(allRoutes, n.fakeIPRoute)
}
routeStrings := n.routesToStrings(allRoutes)
sort.Strings(routeStrings)
@@ -78,23 +85,6 @@ func (n *Notifier) notify() {
}(n.listener)
}
// extraInitialRoutes returns initialRoutes whose network prefix is absent
// from currentRoutes (e.g. the fake IP block added at setup time).
func (n *Notifier) extraInitialRoutes() []*route.Route {
currentNets := make(map[netip.Prefix]struct{}, len(n.currentRoutes))
for _, r := range n.currentRoutes {
currentNets[r.Network] = struct{}{}
}
var extra []*route.Route
for _, r := range n.initialRoutes {
if _, ok := currentNets[r.Network]; !ok {
extra = append(extra, r)
}
}
return extra
}
func filterStatic(routes []*route.Route) []*route.Route {
out := make([]*route.Route, 0, len(routes))
for _, r := range routes {

View File

@@ -34,6 +34,10 @@ func (n *Notifier) SetInitialClientRoutes([]*route.Route, []*route.Route) {
// iOS doesn't care about initial routes
}
func (n *Notifier) SetFakeIPRoute(*route.Route) {
// Not used on iOS
}
func (n *Notifier) OnNewRoutes(route.HAMap) {
// Not used on iOS
}

View File

@@ -23,6 +23,10 @@ func (n *Notifier) SetInitialClientRoutes([]*route.Route, []*route.Route) {
// Not used on non-mobile platforms
}
func (n *Notifier) SetFakeIPRoute(*route.Route) {
// Not used on non-mobile platforms
}
func (n *Notifier) OnNewRoutes(idMap route.HAMap) {
// Not used on non-mobile platforms
}

View File

@@ -179,9 +179,11 @@ type StoreConfig struct {
// ReverseProxyConfig contains reverse proxy settings
type ReverseProxyConfig struct {
TrustedHTTPProxies []string `yaml:"trustedHTTPProxies"`
TrustedHTTPProxiesCount uint `yaml:"trustedHTTPProxiesCount"`
TrustedPeers []string `yaml:"trustedPeers"`
TrustedHTTPProxies []string `yaml:"trustedHTTPProxies"`
TrustedHTTPProxiesCount uint `yaml:"trustedHTTPProxiesCount"`
TrustedPeers []string `yaml:"trustedPeers"`
AccessLogRetentionDays int `yaml:"accessLogRetentionDays"`
AccessLogCleanupIntervalHours int `yaml:"accessLogCleanupIntervalHours"`
}
// DefaultConfig returns a CombinedConfig with default values
@@ -645,7 +647,9 @@ func (c *CombinedConfig) ToManagementConfig() (*nbconfig.Config, error) {
// Build reverse proxy config
reverseProxy := nbconfig.ReverseProxy{
TrustedHTTPProxiesCount: mgmt.ReverseProxy.TrustedHTTPProxiesCount,
TrustedHTTPProxiesCount: mgmt.ReverseProxy.TrustedHTTPProxiesCount,
AccessLogRetentionDays: mgmt.ReverseProxy.AccessLogRetentionDays,
AccessLogCleanupIntervalHours: mgmt.ReverseProxy.AccessLogCleanupIntervalHours,
}
for _, p := range mgmt.ReverseProxy.TrustedHTTPProxies {
if prefix, err := netip.ParsePrefix(p); err == nil {

View File

@@ -14,7 +14,6 @@ import (
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
@@ -26,11 +25,22 @@ import (
"github.com/netbirdio/netbird/util/wsproxy"
)
var ErrClientClosed = errors.New("client is closed")
// minHealthyDuration is the minimum time a stream must survive before a failure
// resets the backoff timer. Streams that fail faster are considered unhealthy and
// should not reset backoff, so that MaxElapsedTime can eventually stop retries.
const minHealthyDuration = 5 * time.Second
type GRPCClient struct {
realClient proto.FlowServiceClient
clientConn *grpc.ClientConn
stream proto.FlowService_EventsClient
streamMu sync.Mutex
target string
opts []grpc.DialOption
closed bool // prevent creating conn in the middle of the Close
receiving bool // prevent concurrent Receive calls
mu sync.Mutex // protects clientConn, realClient, stream, closed, and receiving
}
func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCClient, error) {
@@ -65,7 +75,8 @@ func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCCl
grpc.WithDefaultServiceConfig(`{"healthCheckConfig": {"serviceName": ""}}`),
)
conn, err := grpc.NewClient(fmt.Sprintf("%s:%s", parsedURL.Hostname(), parsedURL.Port()), opts...)
target := parsedURL.Host
conn, err := grpc.NewClient(target, opts...)
if err != nil {
return nil, fmt.Errorf("creating new grpc client: %w", err)
}
@@ -73,30 +84,73 @@ func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCCl
return &GRPCClient{
realClient: proto.NewFlowServiceClient(conn),
clientConn: conn,
target: target,
opts: opts,
}, nil
}
func (c *GRPCClient) Close() error {
c.streamMu.Lock()
defer c.streamMu.Unlock()
c.mu.Lock()
c.closed = true
c.stream = nil
if err := c.clientConn.Close(); err != nil && !errors.Is(err, context.Canceled) {
conn := c.clientConn
c.clientConn = nil
c.mu.Unlock()
if conn == nil {
return nil
}
if err := conn.Close(); err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("close client connection: %w", err)
}
return nil
}
func (c *GRPCClient) Send(event *proto.FlowEvent) error {
c.mu.Lock()
stream := c.stream
c.mu.Unlock()
if stream == nil {
return errors.New("stream not initialized")
}
if err := stream.Send(event); err != nil {
return fmt.Errorf("send flow event: %w", err)
}
return nil
}
func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHandler func(msg *proto.FlowEventAck) error) error {
c.mu.Lock()
if c.receiving {
c.mu.Unlock()
return errors.New("concurrent Receive calls are not supported")
}
c.receiving = true
c.mu.Unlock()
defer func() {
c.mu.Lock()
c.receiving = false
c.mu.Unlock()
}()
backOff := defaultBackoff(ctx, interval)
operation := func() error {
if err := c.establishStreamAndReceive(ctx, msgHandler); err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
return fmt.Errorf("receive: %w: %w", err, context.Canceled)
}
stream, err := c.establishStream(ctx)
if err != nil {
log.Errorf("failed to establish flow stream, retrying: %v", err)
return c.handleRetryableError(err, time.Time{}, backOff)
}
streamStart := time.Now()
if err := c.receive(stream, msgHandler); err != nil {
log.Errorf("receive failed: %v", err)
return fmt.Errorf("receive: %w", err)
return c.handleRetryableError(err, streamStart, backOff)
}
return nil
}
@@ -108,37 +162,106 @@ func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHan
return nil
}
func (c *GRPCClient) establishStreamAndReceive(ctx context.Context, msgHandler func(msg *proto.FlowEventAck) error) error {
if c.clientConn.GetState() == connectivity.Shutdown {
return errors.New("connection to flow receiver has been shut down")
// handleRetryableError resets the backoff timer if the stream was healthy long
// enough and recreates the underlying ClientConn so that gRPC's internal
// subchannel backoff does not accumulate and compete with our own retry timer.
// A zero streamStart means the stream was never established.
func (c *GRPCClient) handleRetryableError(err error, streamStart time.Time, backOff backoff.BackOff) error {
if isContextDone(err) {
return backoff.Permanent(err)
}
stream, err := c.realClient.Events(ctx, grpc.WaitForReady(true))
if err != nil {
return fmt.Errorf("create event stream: %w", err)
var permErr *backoff.PermanentError
if errors.As(err, &permErr) {
return err
}
err = stream.Send(&proto.FlowEvent{IsInitiator: true})
// Reset the backoff so the next retry starts with a short delay instead of
// continuing the already-elapsed timer. Only do this if the stream was healthy
// long enough; short-lived connect/drop cycles must not defeat MaxElapsedTime.
if !streamStart.IsZero() && time.Since(streamStart) >= minHealthyDuration {
backOff.Reset()
}
if recreateErr := c.recreateConnection(); recreateErr != nil {
log.Errorf("recreate connection: %v", recreateErr)
return recreateErr
}
log.Infof("connection recreated, retrying stream")
return fmt.Errorf("retrying after error: %w", err)
}
func (c *GRPCClient) recreateConnection() error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return backoff.Permanent(ErrClientClosed)
}
conn, err := grpc.NewClient(c.target, c.opts...)
if err != nil {
log.Infof("failed to send initiator message to flow receiver but will attempt to continue. Error: %s", err)
c.mu.Unlock()
return fmt.Errorf("create new connection: %w", err)
}
old := c.clientConn
c.clientConn = conn
c.realClient = proto.NewFlowServiceClient(conn)
c.stream = nil
c.mu.Unlock()
_ = old.Close()
return nil
}
func (c *GRPCClient) establishStream(ctx context.Context) (proto.FlowService_EventsClient, error) {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return nil, backoff.Permanent(ErrClientClosed)
}
cl := c.realClient
c.mu.Unlock()
// open stream outside the lock — blocking operation
stream, err := cl.Events(ctx)
if err != nil {
return nil, fmt.Errorf("create event stream: %w", err)
}
streamReady := false
defer func() {
if !streamReady {
_ = stream.CloseSend()
}
}()
if err = stream.Send(&proto.FlowEvent{IsInitiator: true}); err != nil {
return nil, fmt.Errorf("send initiator: %w", err)
}
if err = checkHeader(stream); err != nil {
return fmt.Errorf("check header: %w", err)
return nil, fmt.Errorf("check header: %w", err)
}
c.streamMu.Lock()
c.mu.Lock()
if c.closed {
c.mu.Unlock()
return nil, backoff.Permanent(ErrClientClosed)
}
c.stream = stream
c.streamMu.Unlock()
c.mu.Unlock()
streamReady = true
return c.receive(stream, msgHandler)
return stream, nil
}
func (c *GRPCClient) receive(stream proto.FlowService_EventsClient, msgHandler func(msg *proto.FlowEventAck) error) error {
for {
msg, err := stream.Recv()
if err != nil {
return fmt.Errorf("receive from stream: %w", err)
return err
}
if msg.IsInitiator {
@@ -169,7 +292,7 @@ func checkHeader(stream proto.FlowService_EventsClient) error {
func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff {
return backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: 1,
RandomizationFactor: 0.5,
Multiplier: 1.7,
MaxInterval: interval / 2,
MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
@@ -178,18 +301,12 @@ func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff
}, ctx)
}
func (c *GRPCClient) Send(event *proto.FlowEvent) error {
c.streamMu.Lock()
stream := c.stream
c.streamMu.Unlock()
if stream == nil {
return errors.New("stream not initialized")
func isContextDone(err error) bool {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return true
}
if err := stream.Send(event); err != nil {
return fmt.Errorf("send flow event: %w", err)
if s, ok := status.FromError(err); ok {
return s.Code() == codes.Canceled || s.Code() == codes.DeadlineExceeded
}
return nil
return false
}

View File

@@ -2,8 +2,11 @@ package client_test
import (
"context"
"encoding/binary"
"errors"
"net"
"sync"
"sync/atomic"
"testing"
"time"
@@ -11,6 +14,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
flow "github.com/netbirdio/netbird/flow/client"
"github.com/netbirdio/netbird/flow/proto"
@@ -18,21 +23,89 @@ import (
type testServer struct {
proto.UnimplementedFlowServiceServer
events chan *proto.FlowEvent
acks chan *proto.FlowEventAck
grpcSrv *grpc.Server
addr string
events chan *proto.FlowEvent
acks chan *proto.FlowEventAck
grpcSrv *grpc.Server
addr string
listener *connTrackListener
closeStream chan struct{} // signal server to close the stream
handlerDone chan struct{} // signaled each time Events() exits
handlerStarted chan struct{} // signaled each time Events() begins
}
// connTrackListener wraps a net.Listener to track accepted connections
// so tests can forcefully close them to simulate PROTOCOL_ERROR/RST_STREAM.
type connTrackListener struct {
net.Listener
mu sync.Mutex
conns []net.Conn
}
func (l *connTrackListener) Accept() (net.Conn, error) {
c, err := l.Listener.Accept()
if err != nil {
return nil, err
}
l.mu.Lock()
l.conns = append(l.conns, c)
l.mu.Unlock()
return c, nil
}
// sendRSTStream writes a raw HTTP/2 RST_STREAM frame with PROTOCOL_ERROR
// (error code 0x1) on every tracked connection. This produces the exact error:
//
// rpc error: code = Internal desc = stream terminated by RST_STREAM with error code: PROTOCOL_ERROR
//
// HTTP/2 RST_STREAM frame format (9-byte header + 4-byte payload):
//
// Length (3 bytes): 0x000004
// Type (1 byte): 0x03 (RST_STREAM)
// Flags (1 byte): 0x00
// Stream ID (4 bytes): target stream (must have bit 31 clear)
// Error Code (4 bytes): 0x00000001 (PROTOCOL_ERROR)
func (l *connTrackListener) connCount() int {
l.mu.Lock()
defer l.mu.Unlock()
return len(l.conns)
}
func (l *connTrackListener) sendRSTStream(streamID uint32) {
l.mu.Lock()
defer l.mu.Unlock()
frame := make([]byte, 13) // 9-byte header + 4-byte payload
// Length = 4 (3 bytes, big-endian)
frame[0], frame[1], frame[2] = 0, 0, 4
// Type = RST_STREAM (0x03)
frame[3] = 0x03
// Flags = 0
frame[4] = 0x00
// Stream ID (4 bytes, big-endian, bit 31 reserved = 0)
binary.BigEndian.PutUint32(frame[5:9], streamID)
// Error Code = PROTOCOL_ERROR (0x1)
binary.BigEndian.PutUint32(frame[9:13], 0x1)
for _, c := range l.conns {
_, _ = c.Write(frame)
}
}
func newTestServer(t *testing.T) *testServer {
listener, err := net.Listen("tcp", "127.0.0.1:0")
rawListener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
listener := &connTrackListener{Listener: rawListener}
s := &testServer{
events: make(chan *proto.FlowEvent, 100),
acks: make(chan *proto.FlowEventAck, 100),
grpcSrv: grpc.NewServer(),
addr: listener.Addr().String(),
events: make(chan *proto.FlowEvent, 100),
acks: make(chan *proto.FlowEventAck, 100),
grpcSrv: grpc.NewServer(),
addr: rawListener.Addr().String(),
listener: listener,
closeStream: make(chan struct{}, 1),
handlerDone: make(chan struct{}, 10),
handlerStarted: make(chan struct{}, 10),
}
proto.RegisterFlowServiceServer(s.grpcSrv, s)
@@ -51,11 +124,23 @@ func newTestServer(t *testing.T) *testServer {
}
func (s *testServer) Events(stream proto.FlowService_EventsServer) error {
defer func() {
select {
case s.handlerDone <- struct{}{}:
default:
}
}()
err := stream.Send(&proto.FlowEventAck{IsInitiator: true})
if err != nil {
return err
}
select {
case s.handlerStarted <- struct{}{}:
default:
}
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
@@ -91,6 +176,8 @@ func (s *testServer) Events(stream proto.FlowService_EventsServer) error {
if err := stream.Send(ack); err != nil {
return err
}
case <-s.closeStream:
return status.Errorf(codes.Internal, "server closing stream")
case <-ctx.Done():
return ctx.Err()
}
@@ -110,16 +197,13 @@ func TestReceive(t *testing.T) {
assert.NoError(t, err, "failed to close flow")
})
receivedAcks := make(map[string]bool)
var ackCount atomic.Int32
receiveDone := make(chan struct{})
go func() {
err := client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
if !msg.IsInitiator && len(msg.EventId) > 0 {
id := string(msg.EventId)
receivedAcks[id] = true
if len(receivedAcks) >= 3 {
if ackCount.Add(1) >= 3 {
close(receiveDone)
}
}
@@ -130,7 +214,11 @@ func TestReceive(t *testing.T) {
}
}()
time.Sleep(500 * time.Millisecond)
select {
case <-server.handlerStarted:
case <-time.After(3 * time.Second):
t.Fatal("timeout waiting for stream to be established")
}
for i := 0; i < 3; i++ {
eventID := uuid.New().String()
@@ -153,7 +241,7 @@ func TestReceive(t *testing.T) {
t.Fatal("timeout waiting for acks to be processed")
}
assert.Equal(t, 3, len(receivedAcks))
assert.Equal(t, int32(3), ackCount.Load())
}
func TestReceive_ContextCancellation(t *testing.T) {
@@ -254,3 +342,195 @@ func TestSend(t *testing.T) {
t.Fatal("timeout waiting for ack to be received by flow")
}
}
func TestNewClient_PermanentClose(t *testing.T) {
server := newTestServer(t)
client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
require.NoError(t, err)
err = client.Close()
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)
done := make(chan error, 1)
go func() {
done <- client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
return nil
})
}()
select {
case err := <-done:
require.ErrorIs(t, err, flow.ErrClientClosed)
case <-time.After(2 * time.Second):
t.Fatal("Receive did not return after Close — stuck in retry loop")
}
}
func TestNewClient_CloseVerify(t *testing.T) {
server := newTestServer(t)
client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)
done := make(chan error, 1)
go func() {
done <- client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
return nil
})
}()
closeDone := make(chan struct{}, 1)
go func() {
_ = client.Close()
closeDone <- struct{}{}
}()
select {
case err := <-done:
require.Error(t, err)
case <-time.After(2 * time.Second):
t.Fatal("Receive did not return after Close — stuck in retry loop")
}
select {
case <-closeDone:
return
case <-time.After(2 * time.Second):
t.Fatal("Close did not return — blocked in retry loop")
}
}
func TestClose_WhileReceiving(t *testing.T) {
server := newTestServer(t)
client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
require.NoError(t, err)
ctx := context.Background() // no timeout — intentional
receiveDone := make(chan struct{})
go func() {
_ = client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
return nil
})
close(receiveDone)
}()
// Wait for the server-side handler to confirm the stream is established.
select {
case <-server.handlerStarted:
case <-time.After(3 * time.Second):
t.Fatal("timeout waiting for stream to be established")
}
closeDone := make(chan struct{})
go func() {
_ = client.Close()
close(closeDone)
}()
select {
case <-closeDone:
// Close returned — good
case <-time.After(2 * time.Second):
t.Fatal("Close blocked forever — Receive stuck in retry loop")
}
select {
case <-receiveDone:
case <-time.After(2 * time.Second):
t.Fatal("Receive did not exit after Close")
}
}
func TestReceive_ProtocolErrorStreamReconnect(t *testing.T) {
server := newTestServer(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
require.NoError(t, err)
t.Cleanup(func() {
err := client.Close()
assert.NoError(t, err, "failed to close flow")
})
// Track acks received before and after server-side stream close
var ackCount atomic.Int32
receivedFirst := make(chan struct{})
receivedAfterReconnect := make(chan struct{})
go func() {
err := client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
if msg.IsInitiator || len(msg.EventId) == 0 {
return nil
}
n := ackCount.Add(1)
if n == 1 {
close(receivedFirst)
}
if n == 2 {
close(receivedAfterReconnect)
}
return nil
})
if err != nil && !errors.Is(err, context.Canceled) {
t.Logf("receive error: %v", err)
}
}()
// Wait for stream to be established, then send first ack
select {
case <-server.handlerStarted:
case <-time.After(3 * time.Second):
t.Fatal("timeout waiting for stream to be established")
}
server.acks <- &proto.FlowEventAck{EventId: []byte("before-close")}
select {
case <-receivedFirst:
case <-time.After(3 * time.Second):
t.Fatal("timeout waiting for first ack")
}
// Snapshot connection count before injecting the fault.
connsBefore := server.listener.connCount()
// Send a raw HTTP/2 RST_STREAM frame with PROTOCOL_ERROR on the TCP connection.
// gRPC multiplexes streams on stream IDs 1, 3, 5, ... (odd, client-initiated).
// Stream ID 1 is the client's first stream (our Events bidi stream).
// This produces the exact error the client sees in production:
// "stream terminated by RST_STREAM with error code: PROTOCOL_ERROR"
server.listener.sendRSTStream(1)
// Wait for the old Events() handler to fully exit so it can no longer
// drain s.acks and drop our injected ack on a broken stream.
select {
case <-server.handlerDone:
case <-time.After(5 * time.Second):
t.Fatal("old Events() handler did not exit after RST_STREAM")
}
require.Eventually(t, func() bool {
return server.listener.connCount() > connsBefore
}, 5*time.Second, 50*time.Millisecond, "client did not open a new TCP connection after RST_STREAM")
server.acks <- &proto.FlowEventAck{EventId: []byte("after-close")}
select {
case <-receivedAfterReconnect:
// Client successfully reconnected and received ack after server-side stream close
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for ack after server-side stream close — client did not reconnect")
}
assert.GreaterOrEqual(t, int(ackCount.Load()), 2, "should have received acks before and after stream close")
assert.GreaterOrEqual(t, server.listener.connCount(), 2, "client should have created at least 2 TCP connections (original + reconnect)")
}

View File

@@ -302,7 +302,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "rate(management_account_peer_meta_update_counter_ratio_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
"expr": "rate(management_account_peer_meta_update_counter_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
"instant": false,
"legendFormat": "{{cluster}}/{{environment}}/{{job}}",
"range": true,
@@ -410,7 +410,7 @@
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "histogram_quantile(0.5,sum(increase(management_account_get_peer_network_map_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"expr": "histogram_quantile(0.5,sum(increase(management_account_get_peer_network_map_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"format": "heatmap",
"fullMetaSearch": false,
"includeNullMetadata": true,
@@ -426,7 +426,7 @@
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "histogram_quantile(0.9,sum(increase(management_account_get_peer_network_map_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"expr": "histogram_quantile(0.9,sum(increase(management_account_get_peer_network_map_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"format": "heatmap",
"fullMetaSearch": false,
"hide": false,
@@ -443,7 +443,7 @@
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "histogram_quantile(0.99,sum(increase(management_account_get_peer_network_map_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"expr": "histogram_quantile(0.99,sum(increase(management_account_get_peer_network_map_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"format": "heatmap",
"fullMetaSearch": false,
"hide": false,
@@ -545,7 +545,7 @@
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "histogram_quantile(0.5,sum(increase(management_account_update_account_peers_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"expr": "histogram_quantile(0.5,sum(increase(management_account_update_account_peers_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"format": "heatmap",
"fullMetaSearch": false,
"includeNullMetadata": true,
@@ -561,7 +561,7 @@
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "histogram_quantile(0.9,sum(increase(management_account_update_account_peers_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"expr": "histogram_quantile(0.9,sum(increase(management_account_update_account_peers_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"format": "heatmap",
"fullMetaSearch": false,
"hide": false,
@@ -578,7 +578,7 @@
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "histogram_quantile(0.99,sum(increase(management_account_update_account_peers_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"expr": "histogram_quantile(0.99,sum(increase(management_account_update_account_peers_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"format": "heatmap",
"fullMetaSearch": false,
"hide": false,
@@ -694,7 +694,7 @@
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "histogram_quantile(0.5,sum(increase(management_grpc_updatechannel_queue_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"expr": "histogram_quantile(0.5,sum(increase(management_grpc_updatechannel_queue_length_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"format": "heatmap",
"fullMetaSearch": false,
"includeNullMetadata": true,
@@ -710,7 +710,7 @@
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "histogram_quantile(0.9,sum(increase(management_grpc_updatechannel_queue_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"expr": "histogram_quantile(0.9,sum(increase(management_grpc_updatechannel_queue_length_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"format": "heatmap",
"fullMetaSearch": false,
"hide": false,
@@ -727,7 +727,7 @@
},
"disableTextWrap": false,
"editorMode": "code",
"expr": "histogram_quantile(0.99,sum(increase(management_grpc_updatechannel_queue_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"expr": "histogram_quantile(0.99,sum(increase(management_grpc_updatechannel_queue_length_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le,cluster,environment,job))",
"format": "heatmap",
"fullMetaSearch": false,
"hide": false,
@@ -841,7 +841,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.50, sum(rate(management_store_persistence_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"expr": "histogram_quantile(0.50, sum(rate(management_store_persistence_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"instant": false,
"legendFormat": "p50",
"range": true,
@@ -853,7 +853,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.90, sum(rate(management_store_persistence_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"expr": "histogram_quantile(0.90, sum(rate(management_store_persistence_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"hide": false,
"instant": false,
"legendFormat": "p90",
@@ -866,7 +866,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum(rate(management_store_persistence_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"expr": "histogram_quantile(0.99, sum(rate(management_store_persistence_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"hide": false,
"instant": false,
"legendFormat": "p99",
@@ -963,7 +963,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.50, sum(rate(management_store_transaction_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"expr": "histogram_quantile(0.50, sum(rate(management_store_transaction_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"instant": false,
"legendFormat": "p50",
"range": true,
@@ -975,7 +975,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.90, sum(rate(management_store_transaction_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"expr": "histogram_quantile(0.90, sum(rate(management_store_transaction_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"hide": false,
"instant": false,
"legendFormat": "p90",
@@ -988,7 +988,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum(rate(management_store_transaction_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"expr": "histogram_quantile(0.99, sum(rate(management_store_transaction_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"hide": false,
"instant": false,
"legendFormat": "p99",
@@ -1085,7 +1085,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.50, sum(rate(management_store_global_lock_acquisition_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"expr": "histogram_quantile(0.50, sum(rate(management_store_global_lock_acquisition_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"instant": false,
"legendFormat": "p50",
"range": true,
@@ -1097,7 +1097,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.90, sum(rate(management_store_global_lock_acquisition_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"expr": "histogram_quantile(0.90, sum(rate(management_store_global_lock_acquisition_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"hide": false,
"instant": false,
"legendFormat": "p90",
@@ -1110,7 +1110,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum(rate(management_store_global_lock_acquisition_duration_ms_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"expr": "histogram_quantile(0.99, sum(rate(management_store_global_lock_acquisition_duration_ms_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (le))",
"hide": false,
"instant": false,
"legendFormat": "p99",
@@ -1221,7 +1221,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "rate(management_idp_authenticate_request_counter_ratio_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
"expr": "rate(management_idp_authenticate_request_counter_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
"instant": false,
"legendFormat": "{{cluster}}/{{environment}}/{{job}}",
"range": true,
@@ -1317,7 +1317,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "rate(management_idp_get_account_counter_ratio_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
"expr": "rate(management_idp_get_account_counter_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
"instant": false,
"legendFormat": "{{cluster}}/{{environment}}/{{job}}",
"range": true,
@@ -1413,7 +1413,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "rate(management_idp_update_user_meta_counter_ratio_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
"expr": "rate(management_idp_update_user_meta_counter_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])",
"instant": false,
"legendFormat": "{{cluster}}/{{environment}}/{{job}}",
"range": true,
@@ -1523,7 +1523,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "sum(rate(management_http_request_counter_ratio_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",method=~\"GET|OPTIONS\"}[$__rate_interval])) by (job,method)",
"expr": "sum(rate(management_http_request_counter_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",method=~\"GET|OPTIONS\"}[$__rate_interval])) by (job,method)",
"instant": false,
"legendFormat": "{{method}}",
"range": true,
@@ -1619,7 +1619,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "sum(rate(management_http_request_counter_ratio_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",method=~\"POST|PUT|DELETE\"}[$__rate_interval])) by (job,method)",
"expr": "sum(rate(management_http_request_counter_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",method=~\"POST|PUT|DELETE\"}[$__rate_interval])) by (job,method)",
"instant": false,
"legendFormat": "{{method}}",
"range": true,
@@ -1715,7 +1715,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.50, sum(rate(management_http_request_duration_ms_total_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"read\"}[5m])) by (le))",
"expr": "histogram_quantile(0.50, sum(rate(management_http_request_duration_ms_total_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"read\"}[5m])) by (le))",
"instant": false,
"legendFormat": "p50",
"range": true,
@@ -1727,7 +1727,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.90, sum(rate(management_http_request_duration_ms_total_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"read\"}[5m])) by (le))",
"expr": "histogram_quantile(0.90, sum(rate(management_http_request_duration_ms_total_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"read\"}[5m])) by (le))",
"hide": false,
"instant": false,
"legendFormat": "p90",
@@ -1740,7 +1740,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum(rate(management_http_request_duration_ms_total_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"read\"}[5m])) by (le))",
"expr": "histogram_quantile(0.99, sum(rate(management_http_request_duration_ms_total_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"read\"}[5m])) by (le))",
"hide": false,
"instant": false,
"legendFormat": "p99",
@@ -1837,7 +1837,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.50, sum(rate(management_http_request_duration_ms_total_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"write\"}[5m])) by (le))",
"expr": "histogram_quantile(0.50, sum(rate(management_http_request_duration_ms_total_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"write\"}[5m])) by (le))",
"instant": false,
"legendFormat": "p50",
"range": true,
@@ -1849,7 +1849,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.90, sum(rate(management_http_request_duration_ms_total_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"write\"}[5m])) by (le))",
"expr": "histogram_quantile(0.90, sum(rate(management_http_request_duration_ms_total_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"write\"}[5m])) by (le))",
"hide": false,
"instant": false,
"legendFormat": "p90",
@@ -1862,7 +1862,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "histogram_quantile(0.99, sum(rate(management_http_request_duration_ms_total_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"write\"}[5m])) by (le))",
"expr": "histogram_quantile(0.99, sum(rate(management_http_request_duration_ms_total_milliseconds_bucket{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\",type=~\"write\"}[5m])) by (le))",
"hide": false,
"instant": false,
"legendFormat": "p99",
@@ -1963,7 +1963,7 @@
"uid": "${datasource}"
},
"editorMode": "code",
"expr": "sum(rate(management_http_request_counter_ratio_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (job,exported_endpoint,method)",
"expr": "sum(rate(management_http_request_counter_total{cluster=~\"$cluster\",environment=~\"$environment\",job=~\"$job\",host=~\"$host\"}[$__rate_interval])) by (job,exported_endpoint,method)",
"hide": false,
"instant": false,
"legendFormat": "{{method}}-{{exported_endpoint}}",
@@ -3222,7 +3222,7 @@
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "sum by(le) (increase(management_grpc_updatechannel_queue_bucket{application=\"management\", environment=\"$environment\", host=~\"$host\"}[$__rate_interval]))",
"expr": "sum by(le) (increase(management_grpc_updatechannel_queue_length_bucket{application=\"management\", environment=\"$environment\", host=~\"$host\"}[$__rate_interval]))",
"format": "heatmap",
"fullMetaSearch": false,
"includeNullMetadata": true,
@@ -3323,7 +3323,7 @@
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "sum by(le) (increase(management_account_update_account_peers_duration_ms_bucket{application=\"management\", environment=\"$environment\", host=~\"$host\"}[$__rate_interval]))",
"expr": "sum by(le) (increase(management_account_update_account_peers_duration_ms_milliseconds_bucket{application=\"management\", environment=\"$environment\", host=~\"$host\"}[$__rate_interval]))",
"format": "heatmap",
"fullMetaSearch": false,
"includeNullMetadata": true,

View File

@@ -106,13 +106,23 @@ func (m *managerImpl) CleanupOldAccessLogs(ctx context.Context, retentionDays in
// StartPeriodicCleanup starts a background goroutine that periodically cleans up old access logs
func (m *managerImpl) StartPeriodicCleanup(ctx context.Context, retentionDays, cleanupIntervalHours int) {
if retentionDays <= 0 {
log.WithContext(ctx).Debug("periodic access log cleanup disabled: retention days is 0 or negative")
if retentionDays < 0 {
log.WithContext(ctx).Debug("periodic access log cleanup disabled: retention days is negative")
return
}
if retentionDays == 0 {
retentionDays = 7
log.WithContext(ctx).Debugf("no retention days specified for access log cleanup, defaulting to %d days", retentionDays)
} else {
log.WithContext(ctx).Debugf("access log retention period set to %d days", retentionDays)
}
if cleanupIntervalHours <= 0 {
cleanupIntervalHours = 24
log.WithContext(ctx).Debugf("no cleanup interval specified for access log cleanup, defaulting to %d hours", cleanupIntervalHours)
} else {
log.WithContext(ctx).Debugf("access log cleanup interval set to %d hours", cleanupIntervalHours)
}
cleanupCtx, cancel := context.WithCancel(ctx)

View File

@@ -121,7 +121,7 @@ func TestCleanupWithExactBoundary(t *testing.T) {
}
func TestStartPeriodicCleanup(t *testing.T) {
t.Run("periodic cleanup disabled with zero retention", func(t *testing.T) {
t.Run("periodic cleanup disabled with negative retention", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
@@ -135,7 +135,7 @@ func TestStartPeriodicCleanup(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager.StartPeriodicCleanup(ctx, 0, 1)
manager.StartPeriodicCleanup(ctx, -1, 1)
time.Sleep(100 * time.Millisecond)

View File

@@ -30,3 +30,8 @@ func (d *Domain) EventMeta() map[string]any {
"validated": d.Validated,
}
}
func (d *Domain) Copy() *Domain {
dCopy := *d
return &dCopy
}

View File

@@ -203,7 +203,7 @@ type ReverseProxy struct {
// AccessLogRetentionDays specifies the number of days to retain access logs.
// Logs older than this duration will be automatically deleted during cleanup.
// A value of 0 or negative means logs are kept indefinitely (no cleanup).
// A value of 0 will default to 7 days. Negative means logs are kept indefinitely (no cleanup).
AccessLogRetentionDays int
// AccessLogCleanupIntervalHours specifies how often (in hours) to run the cleanup routine.

View File

@@ -742,11 +742,6 @@ func (am *DefaultAccountManager) DeleteAccount(ctx context.Context, accountID, u
return status.Errorf(status.Internal, "failed to build user infos for account %s: %v", accountID, err)
}
err = am.serviceManager.DeleteAllServices(ctx, accountID, userID)
if err != nil {
return status.Errorf(status.Internal, "failed to delete service %s: %v", accountID, err)
}
for _, otherUser := range account.Users {
if otherUser.Id == userID {
continue

View File

@@ -15,7 +15,6 @@ import (
"time"
"github.com/golang/mock/gomock"
"github.com/netbirdio/netbird/shared/management/status"
"github.com/prometheus/client_golang/prometheus/push"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
@@ -23,6 +22,9 @@ import (
"go.opentelemetry.io/otel/metric/noop"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy/domain"
"github.com/netbirdio/netbird/shared/management/status"
nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
"github.com/netbirdio/netbird/management/internals/controllers/network_map/controller"
@@ -1815,6 +1817,13 @@ func TestAccount_Copy(t *testing.T) {
Targets: []*service.Target{},
},
},
Domains: []*domain.Domain{
{
ID: "domain1",
Domain: "test.com",
AccountID: "account1",
},
},
NetworkMapCache: &types.NetworkMapBuilder{},
}
account.InitOnce()

View File

@@ -489,6 +489,102 @@ func MigrateJsonToTable[T any](ctx context.Context, db *gorm.DB, columnName stri
return nil
}
// hasForeignKey checks whether a foreign key constraint exists on the given table and column.
func hasForeignKey(db *gorm.DB, table, column string) bool {
var count int64
switch db.Name() {
case "postgres":
db.Raw(`
SELECT COUNT(*) FROM information_schema.key_column_usage kcu
JOIN information_schema.table_constraints tc
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND kcu.table_name = ?
AND kcu.column_name = ?
`, table, column).Scan(&count)
case "mysql":
db.Raw(`
SELECT COUNT(*) FROM information_schema.key_column_usage
WHERE table_schema = DATABASE()
AND table_name = ?
AND column_name = ?
AND referenced_table_name IS NOT NULL
`, table, column).Scan(&count)
default: // sqlite
type fkInfo struct {
From string
}
var fks []fkInfo
db.Raw(fmt.Sprintf("PRAGMA foreign_key_list(%s)", table)).Scan(&fks)
for _, fk := range fks {
if fk.From == column {
return true
}
}
return false
}
return count > 0
}
// CleanupOrphanedResources deletes rows from the table of model T where the foreign
// key column (fkColumn) references a row in the table of model R that no longer exists.
func CleanupOrphanedResources[T any, R any](ctx context.Context, db *gorm.DB, fkColumn string) error {
var model T
var refModel R
if !db.Migrator().HasTable(&model) {
log.WithContext(ctx).Debugf("table for %T does not exist, no cleanup needed", model)
return nil
}
if !db.Migrator().HasTable(&refModel) {
log.WithContext(ctx).Debugf("referenced table for %T does not exist, no cleanup needed", refModel)
return nil
}
stmtT := &gorm.Statement{DB: db}
if err := stmtT.Parse(&model); err != nil {
return fmt.Errorf("parse model %T: %w", model, err)
}
childTable := stmtT.Schema.Table
stmtR := &gorm.Statement{DB: db}
if err := stmtR.Parse(&refModel); err != nil {
return fmt.Errorf("parse reference model %T: %w", refModel, err)
}
parentTable := stmtR.Schema.Table
if !db.Migrator().HasColumn(&model, fkColumn) {
log.WithContext(ctx).Debugf("column %s does not exist in table %s, no cleanup needed", fkColumn, childTable)
return nil
}
// If a foreign key constraint already exists on the column, the DB itself
// enforces referential integrity and orphaned rows cannot exist.
if hasForeignKey(db, childTable, fkColumn) {
log.WithContext(ctx).Debugf("foreign key constraint for %s already exists on %s, no cleanup needed", fkColumn, childTable)
return nil
}
result := db.Exec(
fmt.Sprintf(
"DELETE FROM %s WHERE %s NOT IN (SELECT id FROM %s)",
childTable, fkColumn, parentTable,
),
)
if result.Error != nil {
return fmt.Errorf("cleanup orphaned rows in %s: %w", childTable, result.Error)
}
log.WithContext(ctx).Infof("Cleaned up %d orphaned rows from %s where %s had no matching row in %s",
result.RowsAffected, childTable, fkColumn, parentTable)
return nil
}
func RemoveDuplicatePeerKeys(ctx context.Context, db *gorm.DB) error {
if !db.Migrator().HasTable("peers") {
log.WithContext(ctx).Debug("peers table does not exist, skipping duplicate key cleanup")

View File

@@ -441,3 +441,197 @@ func TestRemoveDuplicatePeerKeys_NoTable(t *testing.T) {
err := migration.RemoveDuplicatePeerKeys(context.Background(), db)
require.NoError(t, err, "Should not fail when table does not exist")
}
type testParent struct {
ID string `gorm:"primaryKey"`
}
func (testParent) TableName() string {
return "test_parents"
}
type testChild struct {
ID string `gorm:"primaryKey"`
ParentID string
}
func (testChild) TableName() string {
return "test_children"
}
type testChildWithFK struct {
ID string `gorm:"primaryKey"`
ParentID string `gorm:"index"`
Parent *testParent `gorm:"foreignKey:ParentID"`
}
func (testChildWithFK) TableName() string {
return "test_children"
}
func setupOrphanTestDB(t *testing.T, models ...any) *gorm.DB {
t.Helper()
db := setupDatabase(t)
for _, m := range models {
_ = db.Migrator().DropTable(m)
}
err := db.AutoMigrate(models...)
require.NoError(t, err, "Failed to auto-migrate tables")
return db
}
func TestCleanupOrphanedResources_NoChildTable(t *testing.T) {
db := setupDatabase(t)
_ = db.Migrator().DropTable(&testChild{})
_ = db.Migrator().DropTable(&testParent{})
err := migration.CleanupOrphanedResources[testChild, testParent](context.Background(), db, "parent_id")
require.NoError(t, err, "Should not fail when child table does not exist")
}
func TestCleanupOrphanedResources_NoParentTable(t *testing.T) {
db := setupDatabase(t)
_ = db.Migrator().DropTable(&testParent{})
_ = db.Migrator().DropTable(&testChild{})
err := db.AutoMigrate(&testChild{})
require.NoError(t, err)
err = migration.CleanupOrphanedResources[testChild, testParent](context.Background(), db, "parent_id")
require.NoError(t, err, "Should not fail when parent table does not exist")
}
func TestCleanupOrphanedResources_EmptyTables(t *testing.T) {
db := setupOrphanTestDB(t, &testParent{}, &testChild{})
err := migration.CleanupOrphanedResources[testChild, testParent](context.Background(), db, "parent_id")
require.NoError(t, err, "Should not fail on empty tables")
var count int64
db.Model(&testChild{}).Count(&count)
assert.Equal(t, int64(0), count)
}
func TestCleanupOrphanedResources_NoOrphans(t *testing.T) {
db := setupOrphanTestDB(t, &testParent{}, &testChild{})
require.NoError(t, db.Create(&testParent{ID: "p1"}).Error)
require.NoError(t, db.Create(&testParent{ID: "p2"}).Error)
require.NoError(t, db.Create(&testChild{ID: "c1", ParentID: "p1"}).Error)
require.NoError(t, db.Create(&testChild{ID: "c2", ParentID: "p2"}).Error)
err := migration.CleanupOrphanedResources[testChild, testParent](context.Background(), db, "parent_id")
require.NoError(t, err)
var count int64
db.Model(&testChild{}).Count(&count)
assert.Equal(t, int64(2), count, "All children should remain when no orphans")
}
func TestCleanupOrphanedResources_AllOrphans(t *testing.T) {
db := setupOrphanTestDB(t, &testParent{}, &testChild{})
require.NoError(t, db.Exec("INSERT INTO test_children (id, parent_id) VALUES (?, ?)", "c1", "gone1").Error)
require.NoError(t, db.Exec("INSERT INTO test_children (id, parent_id) VALUES (?, ?)", "c2", "gone2").Error)
require.NoError(t, db.Exec("INSERT INTO test_children (id, parent_id) VALUES (?, ?)", "c3", "gone3").Error)
err := migration.CleanupOrphanedResources[testChild, testParent](context.Background(), db, "parent_id")
require.NoError(t, err)
var count int64
db.Model(&testChild{}).Count(&count)
assert.Equal(t, int64(0), count, "All orphaned children should be deleted")
}
func TestCleanupOrphanedResources_MixedValidAndOrphaned(t *testing.T) {
db := setupOrphanTestDB(t, &testParent{}, &testChild{})
require.NoError(t, db.Create(&testParent{ID: "p1"}).Error)
require.NoError(t, db.Create(&testParent{ID: "p2"}).Error)
require.NoError(t, db.Create(&testChild{ID: "c1", ParentID: "p1"}).Error)
require.NoError(t, db.Create(&testChild{ID: "c2", ParentID: "p2"}).Error)
require.NoError(t, db.Create(&testChild{ID: "c3", ParentID: "p1"}).Error)
require.NoError(t, db.Exec("INSERT INTO test_children (id, parent_id) VALUES (?, ?)", "c4", "gone1").Error)
require.NoError(t, db.Exec("INSERT INTO test_children (id, parent_id) VALUES (?, ?)", "c5", "gone2").Error)
err := migration.CleanupOrphanedResources[testChild, testParent](context.Background(), db, "parent_id")
require.NoError(t, err)
var remaining []testChild
require.NoError(t, db.Order("id").Find(&remaining).Error)
assert.Len(t, remaining, 3, "Only valid children should remain")
assert.Equal(t, "c1", remaining[0].ID)
assert.Equal(t, "c2", remaining[1].ID)
assert.Equal(t, "c3", remaining[2].ID)
}
func TestCleanupOrphanedResources_Idempotent(t *testing.T) {
db := setupOrphanTestDB(t, &testParent{}, &testChild{})
require.NoError(t, db.Create(&testParent{ID: "p1"}).Error)
require.NoError(t, db.Create(&testChild{ID: "c1", ParentID: "p1"}).Error)
require.NoError(t, db.Exec("INSERT INTO test_children (id, parent_id) VALUES (?, ?)", "c2", "gone").Error)
ctx := context.Background()
err := migration.CleanupOrphanedResources[testChild, testParent](ctx, db, "parent_id")
require.NoError(t, err)
var count int64
db.Model(&testChild{}).Count(&count)
assert.Equal(t, int64(1), count)
err = migration.CleanupOrphanedResources[testChild, testParent](ctx, db, "parent_id")
require.NoError(t, err)
db.Model(&testChild{}).Count(&count)
assert.Equal(t, int64(1), count, "Count should remain the same after second run")
}
func TestCleanupOrphanedResources_SkipsWhenForeignKeyExists(t *testing.T) {
engine := os.Getenv("NETBIRD_STORE_ENGINE")
if engine != "postgres" && engine != "mysql" {
t.Skip("FK constraint early-exit test requires postgres or mysql")
}
db := setupDatabase(t)
_ = db.Migrator().DropTable(&testChildWithFK{})
_ = db.Migrator().DropTable(&testParent{})
err := db.AutoMigrate(&testParent{}, &testChildWithFK{})
require.NoError(t, err)
require.NoError(t, db.Create(&testParent{ID: "p1"}).Error)
require.NoError(t, db.Create(&testParent{ID: "p2"}).Error)
require.NoError(t, db.Create(&testChildWithFK{ID: "c1", ParentID: "p1"}).Error)
require.NoError(t, db.Create(&testChildWithFK{ID: "c2", ParentID: "p2"}).Error)
switch engine {
case "postgres":
require.NoError(t, db.Exec("ALTER TABLE test_children DROP CONSTRAINT fk_test_children_parent").Error)
require.NoError(t, db.Exec("DELETE FROM test_parents WHERE id = ?", "p2").Error)
require.NoError(t, db.Exec(
"ALTER TABLE test_children ADD CONSTRAINT fk_test_children_parent "+
"FOREIGN KEY (parent_id) REFERENCES test_parents(id) NOT VALID",
).Error)
case "mysql":
require.NoError(t, db.Exec("SET FOREIGN_KEY_CHECKS = 0").Error)
require.NoError(t, db.Exec("ALTER TABLE test_children DROP FOREIGN KEY fk_test_children_parent").Error)
require.NoError(t, db.Exec("DELETE FROM test_parents WHERE id = ?", "p2").Error)
require.NoError(t, db.Exec(
"ALTER TABLE test_children ADD CONSTRAINT fk_test_children_parent "+
"FOREIGN KEY (parent_id) REFERENCES test_parents(id)",
).Error)
require.NoError(t, db.Exec("SET FOREIGN_KEY_CHECKS = 1").Error)
}
err = migration.CleanupOrphanedResources[testChildWithFK, testParent](context.Background(), db, "parent_id")
require.NoError(t, err)
var count int64
db.Model(&testChildWithFK{}).Count(&count)
assert.Equal(t, int64(2), count, "Both rows should survive — migration must skip when FK constraint exists")
}

View File

@@ -396,6 +396,11 @@ func (s *SqlStore) DeleteAccount(ctx context.Context, account *types.Account) er
return result.Error
}
result = tx.Select(clause.Associations).Delete(account.Services, "account_id = ?", account.Id)
if result.Error != nil {
return result.Error
}
result = tx.Select(clause.Associations).Delete(account)
if result.Error != nil {
return result.Error
@@ -2099,7 +2104,8 @@ func (s *SqlStore) getServices(ctx context.Context, accountID string) ([]*rpserv
var createdAt, certIssuedAt sql.NullTime
var status, proxyCluster, sessionPrivateKey, sessionPublicKey sql.NullString
var mode, source, sourcePeer sql.NullString
var terminated sql.NullBool
var terminated, portAutoAssigned sql.NullBool
var listenPort sql.NullInt64
err := row.Scan(
&s.ID,
&s.AccountID,
@@ -2116,8 +2122,8 @@ func (s *SqlStore) getServices(ctx context.Context, accountID string) ([]*rpserv
&sessionPrivateKey,
&sessionPublicKey,
&mode,
&s.ListenPort,
&s.PortAutoAssigned,
&listenPort,
&portAutoAssigned,
&source,
&sourcePeer,
&terminated,
@@ -2164,6 +2170,12 @@ func (s *SqlStore) getServices(ctx context.Context, accountID string) ([]*rpserv
if terminated.Valid {
s.Terminated = terminated.Bool
}
if portAutoAssigned.Valid {
s.PortAutoAssigned = portAutoAssigned.Bool
}
if listenPort.Valid {
s.ListenPort = uint16(listenPort.Int64)
}
s.Targets = []*rpservice.Target{}
return &s, nil
})

View File

@@ -22,6 +22,8 @@ import (
"github.com/stretchr/testify/require"
nbdns "github.com/netbirdio/netbird/dns"
proxydomain "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/domain"
rpservice "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/service"
"github.com/netbirdio/netbird/management/internals/modules/zones"
"github.com/netbirdio/netbird/management/internals/modules/zones/records"
resourceTypes "github.com/netbirdio/netbird/management/server/networks/resources/types"
@@ -350,6 +352,35 @@ func TestSqlite_DeleteAccount(t *testing.T) {
},
}
account.Services = []*rpservice.Service{
{
ID: "service_id",
AccountID: account.Id,
Name: "test service",
Domain: "svc.example.com",
Enabled: true,
Targets: []*rpservice.Target{
{
AccountID: account.Id,
ServiceID: "service_id",
Host: "localhost",
Port: 8080,
Protocol: "http",
Enabled: true,
},
},
},
}
account.Domains = []*proxydomain.Domain{
{
ID: "domain_id",
Domain: "custom.example.com",
AccountID: account.Id,
Validated: true,
},
}
err = store.SaveAccount(context.Background(), account)
require.NoError(t, err)
@@ -411,6 +442,20 @@ func TestSqlite_DeleteAccount(t *testing.T) {
require.NoError(t, err, "expecting no error after removing DeleteAccount when searching for network resources")
require.Len(t, resources, 0, "expecting no network resources to be found after DeleteAccount")
}
domains, err := store.ListCustomDomains(context.Background(), account.Id)
require.NoError(t, err, "expecting no error after DeleteAccount when searching for custom domains")
require.Len(t, domains, 0, "expecting no custom domains to be found after DeleteAccount")
var services []*rpservice.Service
err = store.(*SqlStore).db.Model(&rpservice.Service{}).Find(&services, "account_id = ?", account.Id).Error
require.NoError(t, err, "expecting no error after DeleteAccount when searching for services")
require.Len(t, services, 0, "expecting no services to be found after DeleteAccount")
var targets []*rpservice.Target
err = store.(*SqlStore).db.Model(&rpservice.Target{}).Find(&targets, "account_id = ?", account.Id).Error
require.NoError(t, err, "expecting no error after DeleteAccount when searching for service targets")
require.Len(t, targets, 0, "expecting no service targets to be found after DeleteAccount")
}
func Test_GetAccount(t *testing.T) {

View File

@@ -20,6 +20,7 @@ import (
"github.com/stretchr/testify/assert"
nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy/domain"
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy/service"
resourceTypes "github.com/netbirdio/netbird/management/server/networks/resources/types"
routerTypes "github.com/netbirdio/netbird/management/server/networks/routers/types"
@@ -265,6 +266,7 @@ func setupBenchmarkDB(b testing.TB) (*SqlStore, func(), string) {
&nbdns.NameServerGroup{}, &posture.Checks{}, &networkTypes.Network{},
&routerTypes.NetworkRouter{}, &resourceTypes.NetworkResource{},
&types.AccountOnboarding{}, &service.Service{}, &service.Target{},
&domain.Domain{},
}
for i := len(models) - 1; i >= 0; i-- {

View File

@@ -448,6 +448,12 @@ func getMigrationsPreAuto(ctx context.Context) []migrationFunc {
func(db *gorm.DB) error {
return migration.RemoveDuplicatePeerKeys(ctx, db)
},
func(db *gorm.DB) error {
return migration.CleanupOrphanedResources[rpservice.Service, types.Account](ctx, db, "account_id")
},
func(db *gorm.DB) error {
return migration.CleanupOrphanedResources[domain.Domain, types.Account](ctx, db, "account_id")
},
}
}

View File

@@ -18,6 +18,7 @@ import (
"github.com/netbirdio/netbird/client/ssh/auth"
nbdns "github.com/netbirdio/netbird/dns"
proxydomain "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/domain"
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy/service"
"github.com/netbirdio/netbird/management/internals/modules/zones"
"github.com/netbirdio/netbird/management/internals/modules/zones/records"
@@ -101,6 +102,7 @@ type Account struct {
DNSSettings DNSSettings `gorm:"embedded;embeddedPrefix:dns_settings_"`
PostureChecks []*posture.Checks `gorm:"foreignKey:AccountID;references:id"`
Services []*service.Service `gorm:"foreignKey:AccountID;references:id"`
Domains []*proxydomain.Domain `gorm:"foreignKey:AccountID;references:id"`
// Settings is a dictionary of Account settings
Settings *Settings `gorm:"embedded;embeddedPrefix:settings_"`
Networks []*networkTypes.Network `gorm:"foreignKey:AccountID;references:id"`
@@ -911,6 +913,11 @@ func (a *Account) Copy() *Account {
services = append(services, svc.Copy())
}
domains := []*proxydomain.Domain{}
for _, domain := range a.Domains {
domains = append(domains, domain.Copy())
}
return &Account{
Id: a.Id,
CreatedBy: a.CreatedBy,
@@ -936,6 +943,7 @@ func (a *Account) Copy() *Account {
Onboarding: a.Onboarding,
NetworkMapCache: a.NetworkMapCache,
nmapInitOnce: a.nmapInitOnce,
Domains: domains,
}
}

View File

@@ -4751,6 +4751,7 @@ components:
enum:
- email
- webhook
- slack
example: "email"
NotificationEventType:
type: string
@@ -4804,6 +4805,7 @@ components:
Channel-specific target configuration. The shape depends on the `type` field:
- `email`: requires an `EmailTarget` object
- `webhook`: requires a `WebhookTarget` object
- `slack`: requires a `WebhookTarget` object
oneOf:
- $ref: '#/components/schemas/EmailTarget'
- $ref: '#/components/schemas/WebhookTarget'
@@ -4837,6 +4839,7 @@ components:
Channel-specific target configuration. The shape depends on the `type` field:
- `email`: an `EmailTarget` object
- `webhook`: a `WebhookTarget` object
- `slack`: a `WebhookTarget` object
oneOf:
- $ref: '#/components/schemas/EmailTarget'
- $ref: '#/components/schemas/WebhookTarget'

View File

@@ -686,6 +686,7 @@ func (e NetworkResourceType) Valid() bool {
// Defines values for NotificationChannelType.
const (
NotificationChannelTypeEmail NotificationChannelType = "email"
NotificationChannelTypeSlack NotificationChannelType = "slack"
NotificationChannelTypeWebhook NotificationChannelType = "webhook"
)
@@ -694,6 +695,8 @@ func (e NotificationChannelType) Valid() bool {
switch e {
case NotificationChannelTypeEmail:
return true
case NotificationChannelTypeSlack:
return true
case NotificationChannelTypeWebhook:
return true
default:

View File

@@ -333,7 +333,7 @@ func (c *Client) connect(ctx context.Context) (*RelayAddr, error) {
dialers := c.getDialers()
rd := dialer.NewRaceDial(c.log, dialer.DefaultConnectionTimeout, c.connectionURL, dialers...)
conn, err := rd.Dial()
conn, err := rd.Dial(ctx)
if err != nil {
return nil, err
}

View File

@@ -40,10 +40,10 @@ func NewRaceDial(log *log.Entry, connectionTimeout time.Duration, serverURL stri
}
}
func (r *RaceDial) Dial() (net.Conn, error) {
func (r *RaceDial) Dial(ctx context.Context) (net.Conn, error) {
connChan := make(chan dialResult, len(r.dialerFns))
winnerConn := make(chan net.Conn, 1)
abortCtx, abort := context.WithCancel(context.Background())
abortCtx, abort := context.WithCancel(ctx)
defer abort()
for _, dfn := range r.dialerFns {

View File

@@ -78,7 +78,7 @@ func TestRaceDialEmptyDialers(t *testing.T) {
serverURL := "test.server.com"
rd := NewRaceDial(logger, DefaultConnectionTimeout, serverURL)
conn, err := rd.Dial()
conn, err := rd.Dial(context.Background())
if err == nil {
t.Errorf("Expected an error with empty dialers, got nil")
}
@@ -104,7 +104,7 @@ func TestRaceDialSingleSuccessfulDialer(t *testing.T) {
}
rd := NewRaceDial(logger, DefaultConnectionTimeout, serverURL, mockDialer)
conn, err := rd.Dial()
conn, err := rd.Dial(context.Background())
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
@@ -137,7 +137,7 @@ func TestRaceDialMultipleDialersWithOneSuccess(t *testing.T) {
}
rd := NewRaceDial(logger, DefaultConnectionTimeout, serverURL, mockDialer1, mockDialer2)
conn, err := rd.Dial()
conn, err := rd.Dial(context.Background())
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
@@ -160,7 +160,7 @@ func TestRaceDialTimeout(t *testing.T) {
}
rd := NewRaceDial(logger, 3*time.Second, serverURL, mockDialer)
conn, err := rd.Dial()
conn, err := rd.Dial(context.Background())
if err == nil {
t.Errorf("Expected an error, got nil")
}
@@ -188,7 +188,7 @@ func TestRaceDialAllDialersFail(t *testing.T) {
}
rd := NewRaceDial(logger, DefaultConnectionTimeout, serverURL, mockDialer1, mockDialer2)
conn, err := rd.Dial()
conn, err := rd.Dial(context.Background())
if err == nil {
t.Errorf("Expected an error, got nil")
}
@@ -230,7 +230,7 @@ func TestRaceDialFirstSuccessfulDialerWins(t *testing.T) {
}
rd := NewRaceDial(logger, DefaultConnectionTimeout, serverURL, mockDialer1, mockDialer2)
conn, err := rd.Dial()
conn, err := rd.Dial(context.Background())
if err != nil {
t.Errorf("Expected no error, got %v", err)
}