mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-19 08:46:38 +00:00
Merge remote-tracking branch 'origin/prototype/reverse-proxy-clusters' into prototype/reverse-proxy
# Conflicts: # management/internals/modules/reverseproxy/manager/manager.go # management/internals/modules/reverseproxy/reverseproxy.go # management/internals/server/modules.go # management/internals/shared/grpc/proxy.go # management/server/http/handler.go # management/server/http/testing/testing_tools/channel/channel.go
This commit is contained in:
@@ -5,6 +5,10 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/rs/xid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy"
|
||||
"github.com/netbirdio/netbird/management/internals/modules/reverseproxy/sessionkey"
|
||||
nbgrpc "github.com/netbirdio/netbird/management/internals/shared/grpc"
|
||||
@@ -17,21 +21,29 @@ import (
|
||||
"github.com/netbirdio/netbird/shared/management/status"
|
||||
)
|
||||
|
||||
// ClusterDeriver derives the proxy cluster from a domain.
|
||||
type ClusterDeriver interface {
|
||||
DeriveClusterFromDomain(ctx context.Context, domain string) (string, error)
|
||||
}
|
||||
|
||||
type managerImpl struct {
|
||||
store store.Store
|
||||
accountManager account.Manager
|
||||
permissionsManager permissions.Manager
|
||||
proxyGRPCServer *nbgrpc.ProxyServiceServer
|
||||
tokenStore *nbgrpc.OneTimeTokenStore
|
||||
clusterDeriver ClusterDeriver
|
||||
}
|
||||
|
||||
func NewManager(store store.Store, accountManager account.Manager, permissionsManager permissions.Manager, proxyGRPCServer *nbgrpc.ProxyServiceServer, tokenStore *nbgrpc.OneTimeTokenStore) reverseproxy.Manager {
|
||||
// NewManager creates a new reverse proxy manager.
|
||||
func NewManager(store store.Store, accountManager account.Manager, permissionsManager permissions.Manager, proxyGRPCServer *nbgrpc.ProxyServiceServer, tokenStore *nbgrpc.OneTimeTokenStore, clusterDeriver ClusterDeriver) reverseproxy.Manager {
|
||||
return &managerImpl{
|
||||
store: store,
|
||||
accountManager: accountManager,
|
||||
permissionsManager: permissionsManager,
|
||||
proxyGRPCServer: proxyGRPCServer,
|
||||
tokenStore: tokenStore,
|
||||
clusterDeriver: clusterDeriver,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,9 +80,17 @@ func (m *managerImpl) CreateReverseProxy(ctx context.Context, accountID, userID
|
||||
return nil, status.NewPermissionDeniedError()
|
||||
}
|
||||
|
||||
var proxyCluster string
|
||||
if m.clusterDeriver != nil {
|
||||
proxyCluster, err = m.clusterDeriver.DeriveClusterFromDomain(ctx, reverseProxy.Domain)
|
||||
if err != nil {
|
||||
log.WithError(err).Warnf("could not derive cluster from domain %s, updates will broadcast to all proxies", reverseProxy.Domain)
|
||||
}
|
||||
}
|
||||
|
||||
authConfig := reverseProxy.Auth
|
||||
|
||||
reverseProxy = reverseproxy.NewReverseProxy(accountID, reverseProxy.Name, reverseProxy.Domain, reverseProxy.Targets, reverseProxy.Enabled)
|
||||
reverseProxy = reverseproxy.NewReverseProxy(accountID, reverseProxy.Name, reverseProxy.Domain, proxyCluster, reverseProxy.Targets, reverseProxy.Enabled)
|
||||
|
||||
reverseProxy.Auth = authConfig
|
||||
|
||||
@@ -111,7 +131,7 @@ func (m *managerImpl) CreateReverseProxy(ctx context.Context, accountID, userID
|
||||
|
||||
m.accountManager.StoreEvent(ctx, userID, reverseProxy.ID, accountID, activity.ReverseProxyCreated, reverseProxy.EventMeta())
|
||||
|
||||
m.proxyGRPCServer.SendReverseProxyUpdate(reverseProxy.ToProtoMapping(reverseproxy.Create, token, m.proxyGRPCServer.GetOIDCValidationConfig()))
|
||||
m.proxyGRPCServer.SendReverseProxyUpdateToCluster(reverseProxy.ToProtoMapping(reverseproxy.Create, token), m.proxyGRPCServer.GetOIDCValidationConfig(), reverseProxy.ProxyCluster))
|
||||
|
||||
return reverseProxy, nil
|
||||
}
|
||||
@@ -125,28 +145,44 @@ func (m *managerImpl) UpdateReverseProxy(ctx context.Context, accountID, userID
|
||||
return nil, status.NewPermissionDeniedError()
|
||||
}
|
||||
|
||||
var oldCluster string
|
||||
var domainChanged bool
|
||||
|
||||
err = m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||
// Get existing reverse proxy
|
||||
existingReverseProxy, err := transaction.GetReverseProxyByID(ctx, store.LockingStrengthUpdate, accountID, reverseProxy.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if domain changed and if it conflicts
|
||||
oldCluster = existingReverseProxy.ProxyCluster
|
||||
|
||||
if existingReverseProxy.Domain != reverseProxy.Domain {
|
||||
domainChanged = true
|
||||
conflictReverseProxy, err := transaction.GetReverseProxyByDomain(ctx, accountID, reverseProxy.Domain)
|
||||
if err != nil {
|
||||
if sErr, ok := status.FromError(err); !ok || sErr.Type() != status.NotFound {
|
||||
return fmt.Errorf("failed to check existing reverse proxy: %w", err)
|
||||
return fmt.Errorf("check existing reverse proxy: %w", err)
|
||||
}
|
||||
}
|
||||
if conflictReverseProxy != nil && conflictReverseProxy.ID != reverseProxy.ID {
|
||||
return status.Errorf(status.AlreadyExists, "reverse proxy with domain %s already exists", reverseProxy.Domain)
|
||||
}
|
||||
|
||||
if m.clusterDeriver != nil {
|
||||
newCluster, err := m.clusterDeriver.DeriveClusterFromDomain(ctx, reverseProxy.Domain)
|
||||
if err != nil {
|
||||
log.WithError(err).Warnf("could not derive cluster from domain %s", reverseProxy.Domain)
|
||||
}
|
||||
reverseProxy.ProxyCluster = newCluster
|
||||
}
|
||||
} else {
|
||||
reverseProxy.ProxyCluster = existingReverseProxy.ProxyCluster
|
||||
}
|
||||
|
||||
reverseProxy.Meta = existingReverseProxy.Meta
|
||||
|
||||
if err = transaction.UpdateReverseProxy(ctx, reverseProxy); err != nil {
|
||||
return fmt.Errorf("failed to update reverse proxy: %w", err)
|
||||
return fmt.Errorf("update reverse proxy: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -157,7 +193,12 @@ func (m *managerImpl) UpdateReverseProxy(ctx context.Context, accountID, userID
|
||||
|
||||
m.accountManager.StoreEvent(ctx, userID, reverseProxy.ID, accountID, activity.ReverseProxyUpdated, reverseProxy.EventMeta())
|
||||
|
||||
m.proxyGRPCServer.SendReverseProxyUpdate(reverseProxy.ToProtoMapping(reverseproxy.Update, "", m.proxyGRPCServer.GetOIDCValidationConfig()))
|
||||
if domainChanged && oldCluster != reverseProxy.ProxyCluster {
|
||||
m.proxyGRPCServer.SendReverseProxyUpdateToCluster(reverseProxy.ToProtoMapping(reverseproxy.Delete, ""), oldCluster, m.proxyGRPCServer.GetOIDCValidationConfig())
|
||||
m.proxyGRPCServer.SendReverseProxyUpdateToCluster(reverseProxy.ToProtoMapping(reverseproxy.Create, ""), reverseProxy.ProxyCluster, m.proxyGRPCServer.GetOIDCValidationConfig())
|
||||
} else {
|
||||
m.proxyGRPCServer.SendReverseProxyUpdateToCluster(reverseProxy.ToProtoMapping(reverseproxy.Update, ""), reverseProxy.ProxyCluster, m.proxyGRPCServer.GetOIDCValidationConfig())
|
||||
}
|
||||
|
||||
return reverseProxy, nil
|
||||
}
|
||||
@@ -191,7 +232,7 @@ func (m *managerImpl) DeleteReverseProxy(ctx context.Context, accountID, userID,
|
||||
|
||||
m.accountManager.StoreEvent(ctx, userID, reverseProxyID, accountID, activity.ReverseProxyDeleted, reverseProxy.EventMeta())
|
||||
|
||||
m.proxyGRPCServer.SendReverseProxyUpdate(reverseProxy.ToProtoMapping(reverseproxy.Delete, "", m.proxyGRPCServer.GetOIDCValidationConfig()))
|
||||
m.proxyGRPCServer.SendReverseProxyUpdateToCluster(reverseProxy.ToProtoMapping(reverseproxy.Delete, ""), reverseProxy.ProxyCluster, m.proxyGRPCServer.GetOIDCValidationConfig())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user