merge main

This commit is contained in:
crn4
2026-03-24 14:50:03 +01:00
269 changed files with 20324 additions and 3434 deletions

View File

@@ -13,9 +13,10 @@ import (
type Manager interface {
Connect(ctx context.Context, proxyID, clusterAddress, ipAddress string, accountID *string) error
Disconnect(ctx context.Context, proxyID string) error
Heartbeat(ctx context.Context, proxyID string) error
Heartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error
GetActiveClusterAddresses(ctx context.Context) ([]string, error)
GetActiveClusterAddressesForAccount(ctx context.Context, accountID string) ([]string, error)
GetActiveClusters(ctx context.Context) ([]Cluster, error)
CleanupStale(ctx context.Context, inactivityDuration time.Duration) error
GetAccountProxy(ctx context.Context, accountID string) (*Proxy, error)
CountAccountProxies(ctx context.Context, accountID string) (int64, error)
@@ -38,4 +39,6 @@ type Controller interface {
RegisterProxyToCluster(ctx context.Context, clusterAddr, proxyID string) error
UnregisterProxyFromCluster(ctx context.Context, clusterAddr, proxyID string) error
GetProxiesForCluster(clusterAddr string) []string
ClusterSupportsCustomPorts(clusterAddr string) *bool
ClusterRequireSubdomain(clusterAddr string) *bool
}

View File

@@ -72,6 +72,17 @@ func (c *GRPCController) UnregisterProxyFromCluster(ctx context.Context, cluster
return nil
}
// ClusterSupportsCustomPorts returns whether any proxy in the cluster supports custom ports.
func (c *GRPCController) ClusterSupportsCustomPorts(clusterAddr string) *bool {
return c.proxyGRPCServer.ClusterSupportsCustomPorts(clusterAddr)
}
// ClusterRequireSubdomain returns whether the cluster requires a subdomain label.
// Returns nil when no proxy has reported the capability (defaults to false).
func (c *GRPCController) ClusterRequireSubdomain(clusterAddr string) *bool {
return c.proxyGRPCServer.ClusterRequireSubdomain(clusterAddr)
}
// GetProxiesForCluster returns all proxy IDs registered for a specific cluster.
func (c *GRPCController) GetProxiesForCluster(clusterAddr string) []string {
proxySet, ok := c.clusterProxies.Load(clusterAddr)

View File

@@ -14,9 +14,10 @@ import (
type store interface {
SaveProxy(ctx context.Context, p *proxy.Proxy) error
DisconnectProxy(ctx context.Context, proxyID string) error
UpdateProxyHeartbeat(ctx context.Context, proxyID string) error
UpdateProxyHeartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error
GetActiveProxyClusterAddresses(ctx context.Context) ([]string, error)
GetActiveProxyClusterAddressesForAccount(ctx context.Context, accountID string) ([]string, error)
GetActiveProxyClusters(ctx context.Context) ([]proxy.Cluster, error)
CleanupStaleProxies(ctx context.Context, inactivityDuration time.Duration) error
GetProxyByAccountID(ctx context.Context, accountID string) (*proxy.Proxy, error)
CountProxiesByAccountID(ctx context.Context, accountID string) (int64, error)
@@ -85,11 +86,13 @@ func (m *Manager) Disconnect(ctx context.Context, proxyID string) error {
}
// Heartbeat updates the proxy's last seen timestamp
func (m *Manager) Heartbeat(ctx context.Context, proxyID string) error {
if err := m.store.UpdateProxyHeartbeat(ctx, proxyID); err != nil {
func (m *Manager) Heartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error {
if err := m.store.UpdateProxyHeartbeat(ctx, proxyID, clusterAddress, ipAddress); err != nil {
log.WithContext(ctx).Debugf("failed to update proxy %s heartbeat: %v", proxyID, err)
return err
}
log.WithContext(ctx).Tracef("updated heartbeat for proxy %s", proxyID)
m.metrics.IncrementProxyHeartbeatCount()
return nil
}
@@ -104,6 +107,16 @@ func (m *Manager) GetActiveClusterAddresses(ctx context.Context) ([]string, erro
return addresses, nil
}
// GetActiveClusters returns all active proxy clusters with their connected proxy count.
func (m Manager) GetActiveClusters(ctx context.Context) ([]proxy.Cluster, error) {
clusters, err := m.store.GetActiveProxyClusters(ctx)
if err != nil {
log.WithContext(ctx).Errorf("failed to get active proxy clusters: %v", err)
return nil, err
}
return clusters, nil
}
// CleanupStale removes proxies that haven't sent heartbeat in the specified duration
func (m *Manager) CleanupStale(ctx context.Context, inactivityDuration time.Duration) error {
if err := m.store.CleanupStaleProxies(ctx, inactivityDuration); err != nil {

View File

@@ -16,7 +16,7 @@ import (
type mockStore struct {
saveProxyFunc func(ctx context.Context, p *proxy.Proxy) error
disconnectProxyFunc func(ctx context.Context, proxyID string) error
updateProxyHeartbeatFunc func(ctx context.Context, proxyID string) error
updateProxyHeartbeatFunc func(ctx context.Context, proxyID, clusterAddress, ipAddress string) error
getActiveProxyClusterAddressesFunc func(ctx context.Context) ([]string, error)
getActiveProxyClusterAddressesForAccFunc func(ctx context.Context, accountID string) ([]string, error)
cleanupStaleProxiesFunc func(ctx context.Context, d time.Duration) error
@@ -38,9 +38,9 @@ func (m *mockStore) DisconnectProxy(ctx context.Context, proxyID string) error {
}
return nil
}
func (m *mockStore) UpdateProxyHeartbeat(ctx context.Context, proxyID string) error {
func (m *mockStore) UpdateProxyHeartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error {
if m.updateProxyHeartbeatFunc != nil {
return m.updateProxyHeartbeatFunc(ctx, proxyID)
return m.updateProxyHeartbeatFunc(ctx, proxyID, clusterAddress, ipAddress)
}
return nil
}
@@ -56,6 +56,9 @@ func (m *mockStore) GetActiveProxyClusterAddressesForAccount(ctx context.Context
}
return nil, nil
}
func (m *mockStore) GetActiveProxyClusters(_ context.Context) ([]proxy.Cluster, error) {
return nil, nil
}
func (m *mockStore) CleanupStaleProxies(ctx context.Context, d time.Duration) error {
if m.cleanupStaleProxiesFunc != nil {
return m.cleanupStaleProxiesFunc(ctx, d)

View File

@@ -93,7 +93,6 @@ func (mr *MockManagerMockRecorder) GetActiveClusterAddresses(ctx interface{}) *g
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActiveClusterAddresses", reflect.TypeOf((*MockManager)(nil).GetActiveClusterAddresses), ctx)
}
// GetActiveClusterAddressesForAccount mocks base method.
func (m *MockManager) GetActiveClusterAddressesForAccount(ctx context.Context, accountID string) ([]string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetActiveClusterAddressesForAccount", ctx, accountID)
@@ -102,24 +101,37 @@ func (m *MockManager) GetActiveClusterAddressesForAccount(ctx context.Context, a
return ret0, ret1
}
// GetActiveClusterAddressesForAccount indicates an expected call of GetActiveClusterAddressesForAccount.
func (mr *MockManagerMockRecorder) GetActiveClusterAddressesForAccount(ctx, accountID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActiveClusterAddressesForAccount", reflect.TypeOf((*MockManager)(nil).GetActiveClusterAddressesForAccount), ctx, accountID)
}
// Heartbeat mocks base method.
func (m *MockManager) Heartbeat(ctx context.Context, proxyID string) error {
func (m *MockManager) GetActiveClusters(ctx context.Context) ([]Cluster, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Heartbeat", ctx, proxyID)
ret := m.ctrl.Call(m, "GetActiveClusters", ctx)
ret0, _ := ret[0].([]Cluster)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetActiveClusters indicates an expected call of GetActiveClusters.
func (mr *MockManagerMockRecorder) GetActiveClusters(ctx interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActiveClusters", reflect.TypeOf((*MockManager)(nil).GetActiveClusters), ctx)
}
// Heartbeat mocks base method.
func (m *MockManager) Heartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Heartbeat", ctx, proxyID, clusterAddress, ipAddress)
ret0, _ := ret[0].(error)
return ret0
}
// Heartbeat indicates an expected call of Heartbeat.
func (mr *MockManagerMockRecorder) Heartbeat(ctx, proxyID interface{}) *gomock.Call {
func (mr *MockManagerMockRecorder) Heartbeat(ctx, proxyID, clusterAddress, ipAddress interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Heartbeat", reflect.TypeOf((*MockManager)(nil).Heartbeat), ctx, proxyID)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Heartbeat", reflect.TypeOf((*MockManager)(nil).Heartbeat), ctx, proxyID, clusterAddress, ipAddress)
}
// GetAccountProxy mocks base method.
@@ -204,6 +216,34 @@ func (m *MockController) EXPECT() *MockControllerMockRecorder {
return m.recorder
}
// ClusterSupportsCustomPorts mocks base method.
func (m *MockController) ClusterSupportsCustomPorts(clusterAddr string) *bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClusterSupportsCustomPorts", clusterAddr)
ret0, _ := ret[0].(*bool)
return ret0
}
// ClusterSupportsCustomPorts indicates an expected call of ClusterSupportsCustomPorts.
func (mr *MockControllerMockRecorder) ClusterSupportsCustomPorts(clusterAddr interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClusterSupportsCustomPorts", reflect.TypeOf((*MockController)(nil).ClusterSupportsCustomPorts), clusterAddr)
}
// ClusterRequireSubdomain mocks base method.
func (m *MockController) ClusterRequireSubdomain(clusterAddr string) *bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClusterRequireSubdomain", clusterAddr)
ret0, _ := ret[0].(*bool)
return ret0
}
// ClusterRequireSubdomain indicates an expected call of ClusterRequireSubdomain.
func (mr *MockControllerMockRecorder) ClusterRequireSubdomain(clusterAddr interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClusterRequireSubdomain", reflect.TypeOf((*MockController)(nil).ClusterRequireSubdomain), clusterAddr)
}
// GetOIDCValidationConfig mocks base method.
func (m *MockController) GetOIDCValidationConfig() OIDCValidationConfig {
m.ctrl.T.Helper()

View File

@@ -29,3 +29,9 @@ type Proxy struct {
func (Proxy) TableName() string {
return "proxies"
}
// Cluster represents a group of proxy nodes serving the same address.
type Cluster struct {
Address string
ConnectedProxies int
}