Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,14 +288,16 @@ func (m *Manager) validateSubdomainRequirement(ctx context.Context, domain, clus
}

func (m *Manager) persistNewService(ctx context.Context, accountID string, svc *service.Service) error {
customPorts := m.clusterCustomPorts(ctx, svc)

return m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
if svc.Domain != "" {
if err := m.checkDomainAvailable(ctx, transaction, svc.Domain, ""); err != nil {
return err
}
}

if err := m.ensureL4Port(ctx, transaction, svc); err != nil {
if err := m.ensureL4Port(ctx, transaction, svc, customPorts); err != nil {
return err
}

Expand All @@ -315,12 +317,23 @@ func (m *Manager) persistNewService(ctx context.Context, accountID string, svc *
})
}

// clusterCustomPorts queries whether the cluster supports custom ports.
// Must be called before entering a transaction: the underlying query uses
// the main DB handle, which deadlocks when called inside a transaction
// that already holds the connection.
func (m *Manager) clusterCustomPorts(ctx context.Context, svc *service.Service) *bool {
if !service.IsL4Protocol(svc.Mode) {
return nil
}
return m.capabilities.ClusterSupportsCustomPorts(ctx, svc.ProxyCluster)
}

// ensureL4Port auto-assigns a listen port when needed and validates cluster support.
func (m *Manager) ensureL4Port(ctx context.Context, tx store.Store, svc *service.Service) error {
// customPorts must be pre-computed via clusterCustomPorts before entering a transaction.
func (m *Manager) ensureL4Port(ctx context.Context, tx store.Store, svc *service.Service, customPorts *bool) error {
if !service.IsL4Protocol(svc.Mode) {
return nil
}
customPorts := m.capabilities.ClusterSupportsCustomPorts(ctx, svc.ProxyCluster)
if service.IsPortBasedProtocol(svc.Mode) && svc.ListenPort > 0 && (customPorts == nil || !*customPorts) {
if svc.Source != service.SourceEphemeral {
return status.Errorf(status.InvalidArgument, "custom ports not supported on cluster %s", svc.ProxyCluster)
Expand Down Expand Up @@ -404,12 +417,14 @@ func (m *Manager) assignPort(ctx context.Context, tx store.Store, cluster string
// The count and exists queries use FOR UPDATE locking to serialize concurrent creates
// for the same peer, preventing the per-peer limit from being bypassed.
func (m *Manager) persistNewEphemeralService(ctx context.Context, accountID, peerID string, svc *service.Service) error {
customPorts := m.clusterCustomPorts(ctx, svc)

return m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
if err := m.validateEphemeralPreconditions(ctx, transaction, accountID, peerID, svc); err != nil {
return err
}

if err := m.ensureL4Port(ctx, transaction, svc); err != nil {
if err := m.ensureL4Port(ctx, transaction, svc, customPorts); err != nil {
return err
}

Expand Down Expand Up @@ -512,16 +527,49 @@ type serviceUpdateInfo struct {
}

func (m *Manager) persistServiceUpdate(ctx context.Context, accountID string, service *service.Service) (*serviceUpdateInfo, error) {
effectiveCluster, err := m.resolveEffectiveCluster(ctx, accountID, service)
if err != nil {
return nil, err
}

svcForCaps := *service
svcForCaps.ProxyCluster = effectiveCluster
customPorts := m.clusterCustomPorts(ctx, &svcForCaps)

var updateInfo serviceUpdateInfo

err := m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
return m.executeServiceUpdate(ctx, transaction, accountID, service, &updateInfo)
err = m.store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
return m.executeServiceUpdate(ctx, transaction, accountID, service, &updateInfo, customPorts)
})

return &updateInfo, err
}

func (m *Manager) executeServiceUpdate(ctx context.Context, transaction store.Store, accountID string, service *service.Service, updateInfo *serviceUpdateInfo) error {
// resolveEffectiveCluster determines the cluster that will be used after the update.
// It reads the existing service without locking and derives the new cluster if the domain changed.
func (m *Manager) resolveEffectiveCluster(ctx context.Context, accountID string, svc *service.Service) (string, error) {
existing, err := m.store.GetServiceByID(ctx, store.LockingStrengthNone, accountID, svc.ID)
if err != nil {
return "", err
}

if existing.Domain == svc.Domain {
return existing.ProxyCluster, nil
}

if m.clusterDeriver != nil {
derived, err := m.clusterDeriver.DeriveClusterFromDomain(ctx, accountID, svc.Domain)
if err != nil {
log.WithError(err).Warnf("could not derive cluster from domain %s", svc.Domain)
} else {
return derived, nil
}
}

return existing.ProxyCluster, nil
}

func (m *Manager) executeServiceUpdate(ctx context.Context, transaction store.Store, accountID string, service *service.Service, updateInfo *serviceUpdateInfo, customPorts *bool) error {
existingService, err := transaction.GetServiceByID(ctx, store.LockingStrengthUpdate, accountID, service.ID)
if err != nil {
return err
Expand Down Expand Up @@ -558,7 +606,7 @@ func (m *Manager) executeServiceUpdate(ctx context.Context, transaction store.St
m.preserveListenPort(service, existingService)
updateInfo.serviceEnabledChanged = existingService.Enabled != service.Enabled

if err := m.ensureL4Port(ctx, transaction, service); err != nil {
if err := m.ensureL4Port(ctx, transaction, service, customPorts); err != nil {
return err
}
if err := m.checkPortConflict(ctx, transaction, service); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions management/internals/modules/reverseproxy/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,11 @@ func (s *Service) validateHTTPTargets() error {
}

func (s *Service) validateL4Target(target *Target) error {
// L4 services have a single target; per-target disable is meaningless
// (use the service-level Enabled flag instead). Force it on so that
// buildPathMappings always includes the target in the proto.
target.Enabled = true

if target.Port == 0 {
return errors.New("target port is required for L4 services")
}
Expand Down
Loading