feat(reverse-proxy): clusters API surfaces type, online status, and capability flags (#6148)

The cluster listing now answers three questions in one round-trip
instead of forcing the dashboard to cross-reference the domains API:
which clusters can this account see, are they currently up, and what
do they support. The ProxyCluster wire type drops the boolean
self_hosted in favour of a `type` enum (`account` / `shared`) plus
explicit `online`, `supports_custom_ports`, `require_subdomain`, and
`supports_crowdsec` fields.

Store query reworked so offline clusters still appear (no last_seen
WHERE), with online and connected_proxies both derived from the
existing 2-min active window via portable CASE expressions; the
1-hour heartbeat reaper still removes long-stale rows. Service
manager enriches each cluster with the capability flags via the
existing per-cluster lookups (CapabilityProvider now also exposes
ClusterSupportsCrowdSec).

GetActiveClusterAddresses* keep their tight 2-min filter so service
routing and domain enumeration aren't pulled into the wider window.

The hard cut removes self_hosted from the response — the dashboard is
the only consumer and is updated in the matching PR; no transitional
field is shipped.

Adds a cross-engine regression test asserting offline clusters
surface, connected_proxies counts only fresh proxies, and
account-scoped BYOP clusters never leak across accounts.
This commit is contained in:
Maycon Santos
2026-05-20 10:08:34 +02:00
committed by GitHub
parent 80966ab1b0
commit d250f92c43
17 changed files with 393 additions and 122 deletions

View File

@@ -444,7 +444,7 @@ func (m *testServiceManager) GetServiceByDomain(ctx context.Context, domain stri
return m.store.GetServiceByDomain(ctx, domain)
}
func (m *testServiceManager) GetActiveClusters(_ context.Context, _, _ string) ([]nbproxy.Cluster, error) {
func (m *testServiceManager) GetClusters(_ context.Context, _, _ string) ([]nbproxy.Cluster, error) {
return nil, nil
}

View File

@@ -5736,19 +5736,67 @@ func (s *SqlStore) DeleteAccountCluster(ctx context.Context, clusterAddress, acc
return nil
}
func (s *SqlStore) GetActiveProxyClusters(ctx context.Context, accountID string) ([]proxy.Cluster, error) {
var clusters []proxy.Cluster
// GetProxyClusters returns every cluster the account can see (shared
// plus its own BYOP), regardless of whether any proxy in the cluster
// is currently heartbeating. Online and ConnectedProxies are derived
// from the 2-min active window so the dashboard can render offline
// clusters distinctly; the 1-hour heartbeat reaper still removes rows
// that go quiet for too long.
//
// AccountOwned is determined by whether any proxy row in the group
// carries a non-NULL account_id; the caller maps that to Cluster.Type.
// Capability flags are NOT filled here — the handler enriches them via
// the per-cluster capability lookups.
func (s *SqlStore) GetProxyClusters(ctx context.Context, accountID string) ([]proxy.Cluster, error) {
activeCutoff := time.Now().Add(-proxyActiveThreshold)
type clusterRow struct {
ID string
Address string
ConnectedProxies int
Online bool
AccountOwned bool
}
var rows []clusterRow
result := s.db.Model(&proxy.Proxy{}).
Select("MIN(id) as id, cluster_address as address, COUNT(*) as connected_proxies, COUNT(account_id) > 0 as self_hosted").
Where("status = ? AND last_seen > ? AND (account_id IS NULL OR account_id = ?)",
proxy.StatusConnected, time.Now().Add(-proxyActiveThreshold), accountID).
Select(
"MIN(id) AS id, "+
"cluster_address AS address, "+
// COUNT(CASE WHEN ... THEN 1 END) counts only non-NULL — i.e. only
// rows that satisfy the predicate — so it works portably across
// sqlite/postgres/mysql without dialect-specific FILTER syntax.
"COUNT(CASE WHEN status = ? AND last_seen > ? THEN 1 END) AS connected_proxies, "+
// MAX(CASE …) > 0 expresses BOOL_OR in a way Postgres tolerates
// (Postgres can't MAX a boolean column).
"MAX(CASE WHEN status = ? AND last_seen > ? THEN 1 ELSE 0 END) > 0 AS online, "+
"MAX(CASE WHEN account_id IS NOT NULL THEN 1 ELSE 0 END) > 0 AS account_owned",
proxy.StatusConnected, activeCutoff,
proxy.StatusConnected, activeCutoff,
).
Where("account_id IS NULL OR account_id = ?", accountID).
Group("cluster_address").
Scan(&clusters)
Scan(&rows)
if result.Error != nil {
log.WithContext(ctx).Errorf("failed to get active proxy clusters: %v", result.Error)
return nil, status.Errorf(status.Internal, "get active proxy clusters")
log.WithContext(ctx).Errorf("failed to get proxy clusters: %v", result.Error)
return nil, status.Errorf(status.Internal, "get proxy clusters")
}
clusters := make([]proxy.Cluster, 0, len(rows))
for _, r := range rows {
c := proxy.Cluster{
ID: r.ID,
Address: r.Address,
Online: r.Online,
ConnectedProxies: r.ConnectedProxies,
}
if r.AccountOwned {
c.Type = proxy.ClusterTypeAccount
} else {
c.Type = proxy.ClusterTypeShared
}
clusters = append(clusters, c)
}
return clusters, nil

View File

@@ -0,0 +1,109 @@
package store
import (
"context"
"os"
"runtime"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
rpproxy "github.com/netbirdio/netbird/management/internals/modules/reverseproxy/proxy"
)
// TestSqlStore_GetProxyClusters_DerivesOnlineAndType guards the
// account-visible cluster list against silent regressions in two
// dimensions:
//
// 1. Online derivation: a cluster with one stale and one fresh proxy
// is online and counts only the fresh proxy; a cluster whose
// proxies all heartbeated outside the 2-min window appears offline
// with connected_proxies = 0 (rather than disappearing, which is
// what the old query did).
// 2. Type derivation: a cluster scoped to the calling account is
// reported as `account`; a cluster with account_id IS NULL is
// reported as `shared`. Clusters scoped to other accounts must not
// leak into the result.
//
// Capability flags are intentionally not asserted here — they're filled
// by the manager (handler) layer from the per-cluster capability
// lookups, not by the store query.
func TestSqlStore_GetProxyClusters_DerivesOnlineAndType(t *testing.T) {
if (os.Getenv("CI") == "true" && runtime.GOOS == "darwin") || runtime.GOOS == "windows" {
t.Skip("skip CI tests on darwin and windows")
}
runTestForAllEngines(t, "", func(t *testing.T, store Store) {
ctx := context.Background()
accountID := "acct-clusters"
require.NoError(t, store.SaveAccount(ctx, newAccountWithId(ctx, accountID, "user-1", "")))
otherAccountID := "acct-other"
require.NoError(t, store.SaveAccount(ctx, newAccountWithId(ctx, otherAccountID, "user-2", "")))
acctID := accountID
otherID := otherAccountID
fresh := time.Now().Add(-30 * time.Second)
stale := time.Now().Add(-30 * time.Minute)
mustSave := func(id, cluster string, accID *string, status string, lastSeen time.Time) {
require.NoError(t, store.SaveProxy(ctx, &rpproxy.Proxy{
ID: id,
SessionID: id + "-sess",
ClusterAddress: cluster,
IPAddress: "10.0.0.1",
AccountID: accID,
LastSeen: lastSeen,
Status: status,
}))
}
// shared-mixed: one fresh + one stale proxy → online, connected=1
mustSave("p-shared-fresh", "shared-mixed.netbird.io", nil, rpproxy.StatusConnected, fresh)
mustSave("p-shared-stale", "shared-mixed.netbird.io", nil, rpproxy.StatusConnected, stale)
// shared-offline: only stale proxies → offline, connected=0,
// but row must still appear (this is the new semantic — old
// query would have dropped it entirely).
mustSave("p-shared-off", "shared-offline.netbird.io", nil, rpproxy.StatusConnected, stale)
// account-online: BYOP cluster owned by acctID, fresh
mustSave("p-acct-fresh", "byop.acct.example", &acctID, rpproxy.StatusConnected, fresh)
// other-account: must not surface for acctID
mustSave("p-other", "byop.other.example", &otherID, rpproxy.StatusConnected, fresh)
clusters, err := store.GetProxyClusters(ctx, accountID)
require.NoError(t, err)
byAddr := map[string]rpproxy.Cluster{}
for _, c := range clusters {
byAddr[c.Address] = c
}
assert.NotContains(t, byAddr, "byop.other.example",
"another account's BYOP cluster must not leak into this account's listing")
require.Contains(t, byAddr, "shared-mixed.netbird.io")
mixed := byAddr["shared-mixed.netbird.io"]
assert.Equal(t, rpproxy.ClusterTypeShared, mixed.Type, "shared cluster (account_id IS NULL) must be reported as Type=shared")
assert.True(t, mixed.Online, "cluster with a fresh proxy must be online")
assert.Equal(t, 1, mixed.ConnectedProxies, "connected_proxies must count only fresh proxies; the stale one should not bump the count")
require.Contains(t, byAddr, "shared-offline.netbird.io",
"offline clusters must still appear so the dashboard can render them — the old GetActiveProxyClusters would have dropped this row, which is the regression this test guards against")
offline := byAddr["shared-offline.netbird.io"]
assert.Equal(t, rpproxy.ClusterTypeShared, offline.Type)
assert.False(t, offline.Online, "no fresh heartbeat → offline")
assert.Equal(t, 0, offline.ConnectedProxies, "no fresh proxies → connected_proxies=0")
require.Contains(t, byAddr, "byop.acct.example")
acct := byAddr["byop.acct.example"]
assert.Equal(t, rpproxy.ClusterTypeAccount, acct.Type, "BYOP cluster owned by the account must be reported as Type=account")
assert.True(t, acct.Online)
assert.Equal(t, 1, acct.ConnectedProxies)
})
}

View File

@@ -307,7 +307,7 @@ type Store interface {
UpdateProxyHeartbeat(ctx context.Context, p *proxy.Proxy) error
GetActiveProxyClusterAddresses(ctx context.Context) ([]string, error)
GetActiveProxyClusterAddressesForAccount(ctx context.Context, accountID string) ([]string, error)
GetActiveProxyClusters(ctx context.Context, accountID string) ([]proxy.Cluster, error)
GetProxyClusters(ctx context.Context, accountID string) ([]proxy.Cluster, error)
GetClusterSupportsCustomPorts(ctx context.Context, clusterAddr string) *bool
GetClusterRequireSubdomain(ctx context.Context, clusterAddr string) *bool
GetClusterSupportsCrowdSec(ctx context.Context, clusterAddr string) *bool

View File

@@ -380,6 +380,20 @@ func (mr *MockStoreMockRecorder) DeleteAccount(ctx, account interface{}) *gomock
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAccount", reflect.TypeOf((*MockStore)(nil).DeleteAccount), ctx, account)
}
// DeleteAccountCluster mocks base method.
func (m *MockStore) DeleteAccountCluster(ctx context.Context, clusterAddress, accountID string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteAccountCluster", ctx, clusterAddress, accountID)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteAccountCluster indicates an expected call of DeleteAccountCluster.
func (mr *MockStoreMockRecorder) DeleteAccountCluster(ctx, clusterAddress, accountID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAccountCluster", reflect.TypeOf((*MockStore)(nil).DeleteAccountCluster), ctx, clusterAddress, accountID)
}
// DeleteCustomDomain mocks base method.
func (m *MockStore) DeleteCustomDomain(ctx context.Context, accountID, domainID string) error {
m.ctrl.T.Helper()
@@ -577,20 +591,6 @@ func (mr *MockStoreMockRecorder) DeletePostureChecks(ctx, accountID, postureChec
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeletePostureChecks", reflect.TypeOf((*MockStore)(nil).DeletePostureChecks), ctx, accountID, postureChecksID)
}
// DeleteAccountCluster mocks base method.
func (m *MockStore) DeleteAccountCluster(ctx context.Context, clusterAddress, accountID string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteAccountCluster", ctx, clusterAddress, accountID)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteAccountCluster indicates an expected call of DeleteAccountCluster.
func (mr *MockStoreMockRecorder) DeleteAccountCluster(ctx, clusterAddress, accountID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAccountCluster", reflect.TypeOf((*MockStore)(nil).DeleteAccountCluster), ctx, clusterAddress, accountID)
}
// DeleteRoute mocks base method.
func (m *MockStore) DeleteRoute(ctx context.Context, accountID, routeID string) error {
m.ctrl.T.Helper()
@@ -731,6 +731,20 @@ func (mr *MockStoreMockRecorder) DeleteZoneDNSRecords(ctx, accountID, zoneID int
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteZoneDNSRecords", reflect.TypeOf((*MockStore)(nil).DeleteZoneDNSRecords), ctx, accountID, zoneID)
}
// DisconnectProxy mocks base method.
func (m *MockStore) DisconnectProxy(ctx context.Context, proxyID, sessionID string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DisconnectProxy", ctx, proxyID, sessionID)
ret0, _ := ret[0].(error)
return ret0
}
// DisconnectProxy indicates an expected call of DisconnectProxy.
func (mr *MockStoreMockRecorder) DisconnectProxy(ctx, proxyID, sessionID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisconnectProxy", reflect.TypeOf((*MockStore)(nil).DisconnectProxy), ctx, proxyID, sessionID)
}
// EphemeralServiceExists mocks base method.
func (m *MockStore) EphemeralServiceExists(ctx context.Context, lockStrength LockingStrength, accountID, peerID, domain string) (bool, error) {
m.ctrl.T.Helper()
@@ -1332,21 +1346,6 @@ func (mr *MockStoreMockRecorder) GetActiveProxyClusterAddressesForAccount(ctx, a
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActiveProxyClusterAddressesForAccount", reflect.TypeOf((*MockStore)(nil).GetActiveProxyClusterAddressesForAccount), ctx, accountID)
}
// GetActiveProxyClusters mocks base method.
func (m *MockStore) GetActiveProxyClusters(ctx context.Context, accountID string) ([]proxy.Cluster, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetActiveProxyClusters", ctx, accountID)
ret0, _ := ret[0].([]proxy.Cluster)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetActiveProxyClusters indicates an expected call of GetActiveProxyClusters.
func (mr *MockStoreMockRecorder) GetActiveProxyClusters(ctx, accountID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActiveProxyClusters", reflect.TypeOf((*MockStore)(nil).GetActiveProxyClusters), ctx, accountID)
}
// GetAllAccounts mocks base method.
func (m *MockStore) GetAllAccounts(ctx context.Context) []*types2.Account {
m.ctrl.T.Helper()
@@ -2048,6 +2047,21 @@ func (mr *MockStoreMockRecorder) GetProxyByAccountID(ctx, accountID interface{})
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProxyByAccountID", reflect.TypeOf((*MockStore)(nil).GetProxyByAccountID), ctx, accountID)
}
// GetProxyClusters mocks base method.
func (m *MockStore) GetProxyClusters(ctx context.Context, accountID string) ([]proxy.Cluster, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetProxyClusters", ctx, accountID)
ret0, _ := ret[0].([]proxy.Cluster)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetProxyClusters indicates an expected call of GetProxyClusters.
func (mr *MockStoreMockRecorder) GetProxyClusters(ctx, accountID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProxyClusters", reflect.TypeOf((*MockStore)(nil).GetProxyClusters), ctx, accountID)
}
// GetResourceGroups mocks base method.
func (m *MockStore) GetResourceGroups(ctx context.Context, lockStrength LockingStrength, accountID, resourceID string) ([]*types2.Group, error) {
m.ctrl.T.Helper()
@@ -2950,20 +2964,6 @@ func (mr *MockStoreMockRecorder) SaveProxy(ctx, proxy interface{}) *gomock.Call
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveProxy", reflect.TypeOf((*MockStore)(nil).SaveProxy), ctx, proxy)
}
// DisconnectProxy mocks base method.
func (m *MockStore) DisconnectProxy(ctx context.Context, proxyID, sessionID string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DisconnectProxy", ctx, proxyID, sessionID)
ret0, _ := ret[0].(error)
return ret0
}
// DisconnectProxy indicates an expected call of DisconnectProxy.
func (mr *MockStoreMockRecorder) DisconnectProxy(ctx, proxyID, sessionID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisconnectProxy", reflect.TypeOf((*MockStore)(nil).DisconnectProxy), ctx, proxyID, sessionID)
}
// SaveProxyAccessToken mocks base method.
func (m *MockStore) SaveProxyAccessToken(ctx context.Context, token *types2.ProxyAccessToken) error {
m.ctrl.T.Helper()