mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 08:16:39 +00:00
[management] Fix L4 service creation deadlock on single-connection databases (#5779)
This commit is contained in:
@@ -288,6 +288,8 @@ func (m *Manager) validateSubdomainRequirement(ctx context.Context, domain, clus
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) persistNewService(ctx context.Context, accountID string, svc *service.Service) error {
|
func (m *Manager) persistNewService(ctx context.Context, accountID string, svc *service.Service) error {
|
||||||
|
customPorts := m.clusterCustomPorts(ctx, svc)
|
||||||
|
|
||||||
return m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
return m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||||
if svc.Domain != "" {
|
if svc.Domain != "" {
|
||||||
if err := m.checkDomainAvailable(ctx, transaction, svc.Domain, ""); err != nil {
|
if err := m.checkDomainAvailable(ctx, transaction, svc.Domain, ""); err != nil {
|
||||||
@@ -295,7 +297,7 @@ func (m *Manager) persistNewService(ctx context.Context, accountID string, svc *
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := m.ensureL4Port(ctx, transaction, svc); err != nil {
|
if err := m.ensureL4Port(ctx, transaction, svc, customPorts); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -315,12 +317,23 @@ func (m *Manager) persistNewService(ctx context.Context, accountID string, svc *
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensureL4Port auto-assigns a listen port when needed and validates cluster support.
|
// clusterCustomPorts queries whether the cluster supports custom ports.
|
||||||
func (m *Manager) ensureL4Port(ctx context.Context, tx store.Store, svc *service.Service) error {
|
// Must be called before entering a transaction: the underlying query uses
|
||||||
|
// the main DB handle, which deadlocks when called inside a transaction
|
||||||
|
// that already holds the connection.
|
||||||
|
func (m *Manager) clusterCustomPorts(ctx context.Context, svc *service.Service) *bool {
|
||||||
|
if !service.IsL4Protocol(svc.Mode) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return m.capabilities.ClusterSupportsCustomPorts(ctx, svc.ProxyCluster)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensureL4Port auto-assigns a listen port when needed and validates cluster support.
|
||||||
|
// customPorts must be pre-computed via clusterCustomPorts before entering a transaction.
|
||||||
|
func (m *Manager) ensureL4Port(ctx context.Context, tx store.Store, svc *service.Service, customPorts *bool) error {
|
||||||
if !service.IsL4Protocol(svc.Mode) {
|
if !service.IsL4Protocol(svc.Mode) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
customPorts := m.capabilities.ClusterSupportsCustomPorts(ctx, svc.ProxyCluster)
|
|
||||||
if service.IsPortBasedProtocol(svc.Mode) && svc.ListenPort > 0 && (customPorts == nil || !*customPorts) {
|
if service.IsPortBasedProtocol(svc.Mode) && svc.ListenPort > 0 && (customPorts == nil || !*customPorts) {
|
||||||
if svc.Source != service.SourceEphemeral {
|
if svc.Source != service.SourceEphemeral {
|
||||||
return status.Errorf(status.InvalidArgument, "custom ports not supported on cluster %s", svc.ProxyCluster)
|
return status.Errorf(status.InvalidArgument, "custom ports not supported on cluster %s", svc.ProxyCluster)
|
||||||
@@ -404,12 +417,14 @@ func (m *Manager) assignPort(ctx context.Context, tx store.Store, cluster string
|
|||||||
// The count and exists queries use FOR UPDATE locking to serialize concurrent creates
|
// The count and exists queries use FOR UPDATE locking to serialize concurrent creates
|
||||||
// for the same peer, preventing the per-peer limit from being bypassed.
|
// for the same peer, preventing the per-peer limit from being bypassed.
|
||||||
func (m *Manager) persistNewEphemeralService(ctx context.Context, accountID, peerID string, svc *service.Service) error {
|
func (m *Manager) persistNewEphemeralService(ctx context.Context, accountID, peerID string, svc *service.Service) error {
|
||||||
|
customPorts := m.clusterCustomPorts(ctx, svc)
|
||||||
|
|
||||||
return m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
return m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||||
if err := m.validateEphemeralPreconditions(ctx, transaction, accountID, peerID, svc); err != nil {
|
if err := m.validateEphemeralPreconditions(ctx, transaction, accountID, peerID, svc); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := m.ensureL4Port(ctx, transaction, svc); err != nil {
|
if err := m.ensureL4Port(ctx, transaction, svc, customPorts); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -512,16 +527,49 @@ type serviceUpdateInfo struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) persistServiceUpdate(ctx context.Context, accountID string, service *service.Service) (*serviceUpdateInfo, error) {
|
func (m *Manager) persistServiceUpdate(ctx context.Context, accountID string, service *service.Service) (*serviceUpdateInfo, error) {
|
||||||
|
effectiveCluster, err := m.resolveEffectiveCluster(ctx, accountID, service)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
svcForCaps := *service
|
||||||
|
svcForCaps.ProxyCluster = effectiveCluster
|
||||||
|
customPorts := m.clusterCustomPorts(ctx, &svcForCaps)
|
||||||
|
|
||||||
var updateInfo serviceUpdateInfo
|
var updateInfo serviceUpdateInfo
|
||||||
|
|
||||||
err := m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
err = m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||||
return m.executeServiceUpdate(ctx, transaction, accountID, service, &updateInfo)
|
return m.executeServiceUpdate(ctx, transaction, accountID, service, &updateInfo, customPorts)
|
||||||
})
|
})
|
||||||
|
|
||||||
return &updateInfo, err
|
return &updateInfo, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) executeServiceUpdate(ctx context.Context, transaction store.Store, accountID string, service *service.Service, updateInfo *serviceUpdateInfo) error {
|
// resolveEffectiveCluster determines the cluster that will be used after the update.
|
||||||
|
// It reads the existing service without locking and derives the new cluster if the domain changed.
|
||||||
|
func (m *Manager) resolveEffectiveCluster(ctx context.Context, accountID string, svc *service.Service) (string, error) {
|
||||||
|
existing, err := m.store.GetServiceByID(ctx, store.LockingStrengthNone, accountID, svc.ID)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
if existing.Domain == svc.Domain {
|
||||||
|
return existing.ProxyCluster, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.clusterDeriver != nil {
|
||||||
|
derived, err := m.clusterDeriver.DeriveClusterFromDomain(ctx, accountID, svc.Domain)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Warnf("could not derive cluster from domain %s", svc.Domain)
|
||||||
|
} else {
|
||||||
|
return derived, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return existing.ProxyCluster, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) executeServiceUpdate(ctx context.Context, transaction store.Store, accountID string, service *service.Service, updateInfo *serviceUpdateInfo, customPorts *bool) error {
|
||||||
existingService, err := transaction.GetServiceByID(ctx, store.LockingStrengthUpdate, accountID, service.ID)
|
existingService, err := transaction.GetServiceByID(ctx, store.LockingStrengthUpdate, accountID, service.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -558,7 +606,7 @@ func (m *Manager) executeServiceUpdate(ctx context.Context, transaction store.St
|
|||||||
m.preserveListenPort(service, existingService)
|
m.preserveListenPort(service, existingService)
|
||||||
updateInfo.serviceEnabledChanged = existingService.Enabled != service.Enabled
|
updateInfo.serviceEnabledChanged = existingService.Enabled != service.Enabled
|
||||||
|
|
||||||
if err := m.ensureL4Port(ctx, transaction, service); err != nil {
|
if err := m.ensureL4Port(ctx, transaction, service, customPorts); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := m.checkPortConflict(ctx, transaction, service); err != nil {
|
if err := m.checkPortConflict(ctx, transaction, service); err != nil {
|
||||||
|
|||||||
@@ -787,6 +787,11 @@ func (s *Service) validateHTTPTargets() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) validateL4Target(target *Target) error {
|
func (s *Service) validateL4Target(target *Target) error {
|
||||||
|
// L4 services have a single target; per-target disable is meaningless
|
||||||
|
// (use the service-level Enabled flag instead). Force it on so that
|
||||||
|
// buildPathMappings always includes the target in the proto.
|
||||||
|
target.Enabled = true
|
||||||
|
|
||||||
if target.Port == 0 {
|
if target.Port == 0 {
|
||||||
return errors.New("target port is required for L4 services")
|
return errors.New("target port is required for L4 services")
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user