diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index e629d23ac7a4..03be31c6e14a 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -163,8 +163,18 @@ public final class SystemSessionProperties public static final String RETRY_DELAY_SCALE_FACTOR = "retry_delay_scale_factor"; public static final String LEGACY_MATERIALIZED_VIEW_GRACE_PERIOD = "legacy_materialized_view_grace_period"; public static final String HIDE_INACCESSIBLE_COLUMNS = "hide_inaccessible_columns"; - public static final String FAULT_TOLERANT_EXECUTION_TARGET_TASK_INPUT_SIZE = "fault_tolerant_execution_target_task_input_size"; - public static final String FAULT_TOLERANT_EXECUTION_TARGET_TASK_SPLIT_COUNT = "fault_tolerant_execution_target_task_split_count"; + public static final String FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_GROWTH_PERIOD = "fault_tolerant_execution_arbitrary_distribution_compute_task_target_size_growth_period"; + public static final String FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_GROWTH_FACTOR = "fault_tolerant_execution_arbitrary_distribution_compute_task_target_size_growth_factor"; + public static final String FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_MIN = "fault_tolerant_execution_arbitrary_distribution_compute_task_target_size_min"; + public static final String FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_MAX = "fault_tolerant_execution_arbitrary_distribution_compute_task_target_size_max"; + public static final String FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_GROWTH_PERIOD = "fault_tolerant_execution_arbitrary_distribution_write_task_target_size_growth_period"; + public static final String FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_GROWTH_FACTOR = "fault_tolerant_execution_arbitrary_distribution_write_task_target_size_growth_factor"; + public static final String FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_MIN = "fault_tolerant_execution_arbitrary_distribution_write_task_target_size_min"; + public static final String FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_MAX = "fault_tolerant_execution_arbitrary_distribution_write_task_target_size_max"; + public static final String FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE = "fault_tolerant_execution_hash_distribution_compute_task_target_size"; + public static final String FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_WRITE_TASK_TARGET_SIZE = "fault_tolerant_execution_hash_distribution_write_task_target_size"; + public static final String FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_WRITE_TASK_TARGET_MAX_COUNT = "fault_tolerant_execution_hash_distribution_write_task_target_max_count"; + public static final String FAULT_TOLERANT_EXECUTION_STANDARD_SPLIT_SIZE = "fault_tolerant_execution_standard_split_size"; public static final String FAULT_TOLERANT_EXECUTION_MAX_TASK_SPLIT_COUNT = "fault_tolerant_execution_max_task_split_count"; public static final String FAULT_TOLERANT_EXECUTION_COORDINATOR_TASK_MEMORY = "fault_tolerant_execution_coordinator_task_memory"; public static final String FAULT_TOLERANT_EXECUTION_TASK_MEMORY = "fault_tolerant_execution_task_memory"; @@ -815,15 +825,65 @@ public SystemSessionProperties( featuresConfig.isHideInaccessibleColumns(), value -> validateHideInaccessibleColumns(value, featuresConfig.isHideInaccessibleColumns()), false), + integerProperty( + FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_GROWTH_PERIOD, + "The number of tasks we create for given non-writer stage of arbitrary distribution before we increase task size", + queryManagerConfig.getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod(), + true), + doubleProperty( + FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_GROWTH_FACTOR, + "Growth factor for adaptive sizing of non-writer tasks of arbitrary distribution for fault-tolerant execution", + queryManagerConfig.getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor(), + true), dataSizeProperty( - FAULT_TOLERANT_EXECUTION_TARGET_TASK_INPUT_SIZE, - "Target size in bytes of all task inputs for a single fault tolerant task", - queryManagerConfig.getFaultTolerantExecutionTargetTaskInputSize(), - false), + FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_MIN, + "Initial/min target input size for non-writer tasks of arbitrary distribution of fault-tolerant execution", + queryManagerConfig.getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin(), + true), + dataSizeProperty( + FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_MAX, + "Max target input size for non-writer task of arbitrary distribution of fault-tolerant execution", + queryManagerConfig.getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMax(), + true), + integerProperty( + FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_GROWTH_PERIOD, + "The number of tasks we create for given writer stage of arbitrary distribution before we increase task size", + queryManagerConfig.getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod(), + true), + doubleProperty( + FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_GROWTH_FACTOR, + "Growth factor for adaptive sizing of writer tasks of arbitrary distribution for fault-tolerant execution", + queryManagerConfig.getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor(), + true), + dataSizeProperty( + FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_MIN, + "Initial/min target input size for writer tasks of arbitrary distribution of fault-tolerant execution", + queryManagerConfig.getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMin(), + true), + dataSizeProperty( + FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_MAX, + "Max target input size for writer tasks of arbitrary distribution of fault-tolerant execution", + queryManagerConfig.getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMax(), + true), + dataSizeProperty( + FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE, + "Target input size for non-writer tasks of hash distribution of fault-tolerant execution", + queryManagerConfig.getFaultTolerantExecutionHashDistributionComputeTaskTargetSize(), + true), + dataSizeProperty( + FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_WRITE_TASK_TARGET_SIZE, + "Target input size of writer tasks of hash distribution of fault-tolerant execution", + queryManagerConfig.getFaultTolerantExecutionHashDistributionWriteTaskTargetSize(), + true), integerProperty( - FAULT_TOLERANT_EXECUTION_TARGET_TASK_SPLIT_COUNT, - "Target number of splits for a single fault tolerant task (split weight aware)", - queryManagerConfig.getFaultTolerantExecutionTargetTaskSplitCount(), + FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_WRITE_TASK_TARGET_MAX_COUNT, + "Soft upper bound on number of writer tasks in a stage of hash distribution of fault-tolerant execution", + queryManagerConfig.getFaultTolerantExecutionHashDistributionWriteTaskTargetMaxCount(), + true), + dataSizeProperty( + FAULT_TOLERANT_EXECUTION_STANDARD_SPLIT_SIZE, + "Standard split size for a single fault tolerant task (split weight aware)", + queryManagerConfig.getFaultTolerantExecutionStandardSplitSize(), false), integerProperty( FAULT_TOLERANT_EXECUTION_MAX_TASK_SPLIT_COUNT, @@ -1548,14 +1608,64 @@ public static boolean isHideInaccessibleColumns(Session session) return session.getSystemProperty(HIDE_INACCESSIBLE_COLUMNS, Boolean.class); } - public static DataSize getFaultTolerantExecutionTargetTaskInputSize(Session session) + public static int getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod(Session session) + { + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_GROWTH_PERIOD, Integer.class); + } + + public static double getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor(Session session) + { + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_GROWTH_FACTOR, Double.class); + } + + public static DataSize getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin(Session session) + { + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_MIN, DataSize.class); + } + + public static DataSize getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMax(Session session) + { + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_MAX, DataSize.class); + } + + public static int getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod(Session session) + { + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_GROWTH_PERIOD, Integer.class); + } + + public static double getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor(Session session) + { + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_GROWTH_FACTOR, Double.class); + } + + public static DataSize getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMin(Session session) + { + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_MIN, DataSize.class); + } + + public static DataSize getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMax(Session session) + { + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_MAX, DataSize.class); + } + + public static DataSize getFaultTolerantExecutionHashDistributionComputeTaskTargetSize(Session session) + { + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE, DataSize.class); + } + + public static DataSize getFaultTolerantExecutionHashDistributionWriteTaskTargetSize(Session session) + { + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_WRITE_TASK_TARGET_SIZE, DataSize.class); + } + + public static int getFaultTolerantExecutionHashDistributionWriteTaskTargetMaxCount(Session session) { - return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_TARGET_TASK_INPUT_SIZE, DataSize.class); + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_WRITE_TASK_TARGET_MAX_COUNT, Integer.class); } - public static int getFaultTolerantExecutionTargetTaskSplitCount(Session session) + public static DataSize getFaultTolerantExecutionStandardSplitSize(Session session) { - return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_TARGET_TASK_SPLIT_COUNT, Integer.class); + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_STANDARD_SPLIT_SIZE, DataSize.class); } public static int getFaultTolerantExecutionMaxTaskSplitCount(Session session) diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java index 095886464be2..5bd268b92289 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java @@ -19,6 +19,7 @@ import io.airlift.configuration.LegacyConfig; import io.airlift.units.DataSize; import io.airlift.units.Duration; +import io.airlift.units.MinDataSize; import io.airlift.units.MinDuration; import io.trino.operator.RetryPolicy; @@ -31,6 +32,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.units.DataSize.Unit.GIGABYTE; +import static io.airlift.units.DataSize.Unit.MEGABYTE; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; @@ -42,6 +44,7 @@ "experimental.max-queued-big-queries", "query-manager.initialization-required-workers", "query-manager.initialization-timeout", + " fault-tolerant-execution-target-task-split-count", "query.remote-task.max-consecutive-error-count"}) public class QueryManagerConfig { @@ -95,13 +98,25 @@ public class QueryManagerConfig private int maxTasksWaitingForNodePerStage = 5; private boolean enabledAdaptiveTaskRequestSize = true; - private DataSize maxRemoteTaskRequestSize = DataSize.of(8, DataSize.Unit.MEGABYTE); - private DataSize remoteTaskRequestSizeHeadroom = DataSize.of(2, DataSize.Unit.MEGABYTE); + private DataSize maxRemoteTaskRequestSize = DataSize.of(8, MEGABYTE); + private DataSize remoteTaskRequestSizeHeadroom = DataSize.of(2, MEGABYTE); private int remoteTaskGuaranteedSplitPerTask = 3; - private DataSize faultTolerantExecutionTargetTaskInputSize = DataSize.of(4, GIGABYTE); + private int faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod = 64; + private double faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor = 1.2; + private DataSize faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin = DataSize.of(512, MEGABYTE); + private DataSize faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMax = DataSize.of(50, GIGABYTE); - private int faultTolerantExecutionTargetTaskSplitCount = 64; + private int faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod = 64; + private double faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor = 1.2; + private DataSize faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMin = DataSize.of(4, GIGABYTE); + private DataSize faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMax = DataSize.of(50, GIGABYTE); + + private DataSize faultTolerantExecutionHashDistributionComputeTaskTargetSize = DataSize.of(512, MEGABYTE); + private DataSize faultTolerantExecutionHashDistributionWriteTaskTargetSize = DataSize.of(4, GIGABYTE); + private int faultTolerantExecutionHashDistributionWriteTaskTargetMaxCount = 2000; + + private DataSize faultTolerantExecutionStandardSplitSize = DataSize.of(64, MEGABYTE); private int faultTolerantExecutionMaxTaskSplitCount = 256; private DataSize faultTolerantExecutionTaskDescriptorStorageMaxMemory = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.15)); private int faultTolerantExecutionPartitionCount = 50; @@ -638,31 +653,167 @@ public QueryManagerConfig setRemoteTaskGuaranteedSplitPerTask(int remoteTaskGuar return this; } + public int getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod() + { + return faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod; + } + + @Config("fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-period") + @ConfigDescription("The number of tasks we create for given non-writer stage of arbitrary distribution before we increase task size") + public QueryManagerConfig setFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod(int faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod) + { + this.faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod = faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod; + return this; + } + + public double getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor() + { + return faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor; + } + + @Config("fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-factor") + @ConfigDescription("Growth factor for adaptive sizing of non-writer tasks of arbitrary distribution for fault-tolerant execution") + public QueryManagerConfig setFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor(double faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor) + { + this.faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor = faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor; + return this; + } + @NotNull - public DataSize getFaultTolerantExecutionTargetTaskInputSize() + public DataSize getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin() { - return faultTolerantExecutionTargetTaskInputSize; + return faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin; } - @Config("fault-tolerant-execution-target-task-input-size") - @ConfigDescription("Target size in bytes of all task inputs for a single fault tolerant task") - public QueryManagerConfig setFaultTolerantExecutionTargetTaskInputSize(DataSize faultTolerantExecutionTargetTaskInputSize) + @Config("fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-min") + @ConfigDescription("Initial/min target input size for non-writer tasks of arbitrary distribution of fault-tolerant execution") + public QueryManagerConfig setFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin(DataSize faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin) { - this.faultTolerantExecutionTargetTaskInputSize = faultTolerantExecutionTargetTaskInputSize; + this.faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin = faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin; + return this; + } + + @NotNull + public DataSize getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMax() + { + return faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMax; + } + + @Config("fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-max") + @ConfigDescription("Max target input size for non-writer task of arbitrary distribution of fault-tolerant execution") + public QueryManagerConfig setFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMax(DataSize faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMax) + { + this.faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMax = faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMax; + return this; + } + + public int getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod() + { + return faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod; + } + + @Config("fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-period") + @ConfigDescription("The number of tasks we create for given writer stage of arbitrary distribution before we increase task size") + public QueryManagerConfig setFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod(int faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod) + { + this.faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod = faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod; + return this; + } + + public double getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor() + { + return faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor; + } + + @Config("fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-factor") + @ConfigDescription("Growth factor for adaptive sizing of writer tasks of arbitrary distribution for fault-tolerant execution") + public QueryManagerConfig setFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor(double faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor) + { + this.faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor = faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor; + return this; + } + + @NotNull + public DataSize getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMin() + { + return faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMin; + } + + @Config("fault-tolerant-execution-arbitrary-distribution-write-task-target-size-min") + @ConfigDescription("Initial/min target input size for writer tasks of arbitrary distribution of fault-tolerant execution") + public QueryManagerConfig setFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMin(DataSize faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMin) + { + this.faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMin = faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMin; + return this; + } + + @NotNull + public DataSize getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMax() + { + return faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMax; + } + + @Config("fault-tolerant-execution-arbitrary-distribution-write-task-target-size-max") + @ConfigDescription("Max target input size for writer tasks of arbitrary distribution of fault-tolerant execution") + public QueryManagerConfig setFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMax(DataSize faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMax) + { + this.faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMax = faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMax; + return this; + } + + @NotNull + public DataSize getFaultTolerantExecutionHashDistributionComputeTaskTargetSize() + { + return faultTolerantExecutionHashDistributionComputeTaskTargetSize; + } + + @Config("fault-tolerant-execution-hash-distribution-compute-task-target-size") + @ConfigDescription("Target input size for non-writer tasks of hash distribution of fault-tolerant execution") + public QueryManagerConfig setFaultTolerantExecutionHashDistributionComputeTaskTargetSize(DataSize faultTolerantExecutionHashDistributionComputeTaskTargetSize) + { + this.faultTolerantExecutionHashDistributionComputeTaskTargetSize = faultTolerantExecutionHashDistributionComputeTaskTargetSize; + return this; + } + + @NotNull + public DataSize getFaultTolerantExecutionHashDistributionWriteTaskTargetSize() + { + return faultTolerantExecutionHashDistributionWriteTaskTargetSize; + } + + @Config("fault-tolerant-execution-hash-distribution-write-task-target-size") + @ConfigDescription("Target input size of writer tasks of hash distribution of fault-tolerant execution") + public QueryManagerConfig setFaultTolerantExecutionHashDistributionWriteTaskTargetSize(DataSize faultTolerantExecutionHashDistributionWriteTaskTargetSize) + { + this.faultTolerantExecutionHashDistributionWriteTaskTargetSize = faultTolerantExecutionHashDistributionWriteTaskTargetSize; return this; } @Min(1) - public int getFaultTolerantExecutionTargetTaskSplitCount() + public int getFaultTolerantExecutionHashDistributionWriteTaskTargetMaxCount() + { + return faultTolerantExecutionHashDistributionWriteTaskTargetMaxCount; + } + + @Config("fault-tolerant-execution-hash-distribution-write-task-target-max-count") + @ConfigDescription("Soft upper bound on number of writer tasks in a stage of hash distribution of fault-tolerant execution") + public QueryManagerConfig setFaultTolerantExecutionHashDistributionWriteTaskTargetMaxCount(int faultTolerantExecutionHashDistributionWriteTaskTargetMaxCount) + { + this.faultTolerantExecutionHashDistributionWriteTaskTargetMaxCount = faultTolerantExecutionHashDistributionWriteTaskTargetMaxCount; + return this; + } + + @MinDataSize("1MB") + public DataSize getFaultTolerantExecutionStandardSplitSize() { - return faultTolerantExecutionTargetTaskSplitCount; + return faultTolerantExecutionStandardSplitSize; } - @Config("fault-tolerant-execution-target-task-split-count") - @ConfigDescription("Target number of splits for a single fault tolerant task (split weight aware)") - public QueryManagerConfig setFaultTolerantExecutionTargetTaskSplitCount(int faultTolerantExecutionTargetTaskSplitCount) + @Config("fault-tolerant-execution-standard-split-size") + @ConfigDescription("Standard split size for a single fault tolerant task (split weight aware)") + public QueryManagerConfig setFaultTolerantExecutionStandardSplitSize(DataSize faultTolerantExecutionStandardSplitSize) { - this.faultTolerantExecutionTargetTaskSplitCount = faultTolerantExecutionTargetTaskSplitCount; + this.faultTolerantExecutionStandardSplitSize = faultTolerantExecutionStandardSplitSize; return this; } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java index c1398c446fc8..d34fd273d550 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ArbitraryDistributionSplitAssigner.java @@ -38,6 +38,8 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static io.trino.operator.ExchangeOperator.REMOTE_CATALOG_HANDLE; +import static java.lang.Math.ceil; +import static java.lang.Math.min; import static java.lang.Math.round; import static java.util.Objects.requireNonNull; @@ -48,11 +50,16 @@ class ArbitraryDistributionSplitAssigner private final Set partitionedSources; private final Set replicatedSources; private final Set allSources; - private final long targetPartitionSizeInBytes; + private final int adaptiveGrowthPeriod; + private final double adaptiveGrowthFactor; + private final long minTargetPartitionSizeInBytes; + private final long maxTargetPartitionSizeInBytes; private final long standardSplitSizeInBytes; private final int maxTaskSplitCount; private int nextPartitionId; + private int adaptiveCounter; + private long targetPartitionSizeInBytes; private final List allAssignments = new ArrayList<>(); private final Map, PartitionAssignment> openAssignments = new HashMap<>(); @@ -65,7 +72,10 @@ class ArbitraryDistributionSplitAssigner Optional catalogRequirement, Set partitionedSources, Set replicatedSources, - long targetPartitionSizeInBytes, + int adaptiveGrowthPeriod, + double adaptiveGrowthFactor, + long minTargetPartitionSizeInBytes, + long maxTargetPartitionSizeInBytes, long standardSplitSizeInBytes, int maxTaskSplitCount) { @@ -76,9 +86,14 @@ class ArbitraryDistributionSplitAssigner .addAll(partitionedSources) .addAll(replicatedSources) .build(); - this.targetPartitionSizeInBytes = targetPartitionSizeInBytes; + this.adaptiveGrowthPeriod = adaptiveGrowthPeriod; + this.adaptiveGrowthFactor = adaptiveGrowthFactor; + this.minTargetPartitionSizeInBytes = minTargetPartitionSizeInBytes; + this.maxTargetPartitionSizeInBytes = maxTargetPartitionSizeInBytes; this.standardSplitSizeInBytes = standardSplitSizeInBytes; this.maxTaskSplitCount = maxTaskSplitCount; + + this.targetPartitionSizeInBytes = minTargetPartitionSizeInBytes; } @Override @@ -196,6 +211,14 @@ private AssignmentResult assignPartitionedSplits(PlanNodeId planNodeId, List= adaptiveGrowthPeriod) { + targetPartitionSizeInBytes = (long) min(maxTargetPartitionSizeInBytes, ceil(targetPartitionSizeInBytes * adaptiveGrowthFactor)); + // round to a multiple of minTargetPartitionSizeInBytes so work will be evenly distributed among drivers of a task + targetPartitionSizeInBytes = (targetPartitionSizeInBytes + minTargetPartitionSizeInBytes - 1) / minTargetPartitionSizeInBytes * minTargetPartitionSizeInBytes; + adaptiveCounter = 0; + } } if (partitionAssignment == null) { partitionAssignment = new PartitionAssignment(nextPartitionId++); diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSourceFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSourceFactory.java index b5dff6439979..2ea724d34985 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSourceFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/EventDrivenTaskSourceFactory.java @@ -40,11 +40,22 @@ import java.util.concurrent.ExecutorService; import java.util.function.LongConsumer; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMax; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMax; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMin; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionHashDistributionComputeTaskTargetSize; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionHashDistributionWriteTaskTargetMaxCount; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionHashDistributionWriteTaskTargetSize; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionMaxTaskSplitCount; -import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTargetTaskInputSize; -import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTargetTaskSplitCount; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionStandardSplitSize; import static io.trino.sql.planner.SystemPartitioningHandle.COORDINATOR_DISTRIBUTION; import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION; import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION; @@ -107,9 +118,7 @@ public EventDrivenTaskSource create( remoteSources.put(remoteSource.getId(), sourceFragment); } } - long targetPartitionSizeInBytes = getFaultTolerantExecutionTargetTaskInputSize(session).toBytes(); - // TODO: refactor to define explicitly - long standardSplitSizeInBytes = targetPartitionSizeInBytes / getFaultTolerantExecutionTargetTaskSplitCount(session); + long standardSplitSizeInBytes = getFaultTolerantExecutionStandardSplitSize(session).toBytes(); int maxTaskSplitCount = getFaultTolerantExecutionMaxTaskSplitCount(session); return new EventDrivenTaskSource( session.getQueryId(), @@ -122,7 +131,6 @@ public EventDrivenTaskSource create( fragment, outputDataSizeEstimates, sourcePartitioningScheme, - targetPartitionSizeInBytes, standardSplitSizeInBytes, maxTaskSplitCount), executor, @@ -137,7 +145,6 @@ private SplitAssigner createSplitAssigner( PlanFragment fragment, Map outputDataSizeEstimates, FaultTolerantPartitioningScheme sourcePartitioningScheme, - long targetPartitionSizeInBytes, long standardSplitSizeInBytes, int maxArbitraryDistributionTaskSplitCount) { @@ -171,18 +178,61 @@ private SplitAssigner createSplitAssigner( .addAll(replicatedSources) .build()); } - if (partitioning.equals(FIXED_ARBITRARY_DISTRIBUTION) || partitioning.equals(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION) || partitioning.equals(SOURCE_DISTRIBUTION)) { + + int arbitraryDistributionComputeTaskTargetSizeGrowthPeriod = getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod(session); + double arbitraryDistributionComputeTaskTargetSizeGrowthFactor = getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor(session); + long arbitraryDistributionComputeTaskTargetSizeInBytesMin = getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin(session).toBytes(); + long arbitraryDistributionComputeTaskTargetSizeInBytesMax = getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMax(session).toBytes(); + checkArgument(arbitraryDistributionComputeTaskTargetSizeInBytesMax >= arbitraryDistributionComputeTaskTargetSizeInBytesMin, + "arbitraryDistributionComputeTaskTargetSizeInBytesMax %s should be no smaller than arbitraryDistributionComputeTaskTargetSizeInBytesMin %s", + arbitraryDistributionComputeTaskTargetSizeInBytesMax, arbitraryDistributionComputeTaskTargetSizeInBytesMin); + + int arbitraryDistributionWriteTaskTargetSizeGrowthPeriod = getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod(session); + double arbitraryDistributionWriteTaskTargetSizeGrowthFactor = getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor(session); + long arbitraryDistributionWriteTaskTargetSizeInBytesMin = getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMin(session).toBytes(); + long arbitraryDistributionWriteTaskTargetSizeInBytesMax = getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMax(session).toBytes(); + checkArgument(arbitraryDistributionWriteTaskTargetSizeInBytesMax >= arbitraryDistributionWriteTaskTargetSizeInBytesMin, + "arbitraryDistributionWriteTaskTargetSizeInBytesMax %s should be larger than arbitraryDistributionWriteTaskTargetSizeInBytesMin %s", + arbitraryDistributionWriteTaskTargetSizeInBytesMax, arbitraryDistributionWriteTaskTargetSizeInBytesMin); + + if (partitioning.equals(FIXED_ARBITRARY_DISTRIBUTION) || partitioning.equals(SOURCE_DISTRIBUTION)) { return new ArbitraryDistributionSplitAssigner( partitioning.getCatalogHandle(), partitionedSources, replicatedSources, - targetPartitionSizeInBytes, + arbitraryDistributionComputeTaskTargetSizeGrowthPeriod, + arbitraryDistributionComputeTaskTargetSizeGrowthFactor, + arbitraryDistributionComputeTaskTargetSizeInBytesMin, + arbitraryDistributionComputeTaskTargetSizeInBytesMax, + standardSplitSizeInBytes, + maxArbitraryDistributionTaskSplitCount); + } + + if (partitioning.equals(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) { + return new ArbitraryDistributionSplitAssigner( + partitioning.getCatalogHandle(), + partitionedSources, + replicatedSources, + arbitraryDistributionWriteTaskTargetSizeGrowthPeriod, + arbitraryDistributionWriteTaskTargetSizeGrowthFactor, + arbitraryDistributionWriteTaskTargetSizeInBytesMin, + arbitraryDistributionWriteTaskTargetSizeInBytesMax, standardSplitSizeInBytes, maxArbitraryDistributionTaskSplitCount); } if (partitioning.equals(FIXED_HASH_DISTRIBUTION) || partitioning.getCatalogHandle().isPresent() || - (partitioning.getConnectorHandle() instanceof MergePartitioningHandle) || - partitioning.equals(SCALED_WRITER_HASH_DISTRIBUTION)) { + (partitioning.getConnectorHandle() instanceof MergePartitioningHandle)) { + return HashDistributionSplitAssigner.create( + partitioning.getCatalogHandle(), + partitionedSources, + replicatedSources, + sourcePartitioningScheme, + outputDataSizeEstimates, + fragment, + getFaultTolerantExecutionHashDistributionComputeTaskTargetSize(session).toBytes(), + Integer.MAX_VALUE); // compute tasks are bounded by the number of partitions anyways + } + if (partitioning.equals(SCALED_WRITER_HASH_DISTRIBUTION)) { return HashDistributionSplitAssigner.create( partitioning.getCatalogHandle(), partitionedSources, @@ -190,7 +240,8 @@ private SplitAssigner createSplitAssigner( sourcePartitioningScheme, outputDataSizeEstimates, fragment, - getFaultTolerantExecutionTargetTaskInputSize(session).toBytes()); + getFaultTolerantExecutionHashDistributionWriteTaskTargetSize(session).toBytes(), + getFaultTolerantExecutionHashDistributionWriteTaskTargetMaxCount(session)); } // other partitioning handles are not expected to be set as a fragment partitioning diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java index 25a49a798d59..254c25162124 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/HashDistributionSplitAssigner.java @@ -71,7 +71,8 @@ public static HashDistributionSplitAssigner create( FaultTolerantPartitioningScheme sourcePartitioningScheme, Map outputDataSizeEstimates, PlanFragment fragment, - long targetPartitionSizeInBytes) + long targetPartitionSizeInBytes, + int targetMaxTaskCount) { if (fragment.getPartitioning().equals(SCALED_WRITER_HASH_DISTRIBUTION)) { verify( @@ -89,6 +90,7 @@ public static HashDistributionSplitAssigner create( partitionedSources, outputDataSizeEstimates, targetPartitionSizeInBytes, + targetMaxTaskCount, sourceId -> fragment.getPartitioning().equals(SCALED_WRITER_HASH_DISTRIBUTION), // never merge partitions for table write to avoid running into the maximum writers limit per task !isWriteFragment(fragment))); @@ -205,6 +207,7 @@ static Map createOutputPartitionToTaskPartition( Set partitionedSources, Map outputDataSizeEstimates, long targetPartitionSizeInBytes, + int targetMaxTaskCount, Predicate canSplit, boolean canMerge) { @@ -223,6 +226,20 @@ static Map createOutputPartitionToTaskPartition( .map(Map.Entry::getValue) .collect(toImmutableList()); OutputDataSizeEstimate mergedEstimate = OutputDataSizeEstimate.merge(partitionedSourcesEstimates); + + // adjust targetPartitionSizeInBytes based on total input bytes + if (targetMaxTaskCount != Integer.MAX_VALUE) { + long totalBytes = 0; + for (int partitionId = 0; partitionId < partitionCount; partitionId++) { + totalBytes += mergedEstimate.getPartitionSizeInBytes(partitionId); + } + if (totalBytes / targetPartitionSizeInBytes > targetMaxTaskCount) { + // targetMaxTaskCount is only used to adjust targetPartitionSizeInBytes to avoid excessive number + // of tasks; actual number of tasks depend on the data size distribution and may exceed its value + targetPartitionSizeInBytes = (totalBytes + targetMaxTaskCount - 1) / targetMaxTaskCount; + } + } + ImmutableMap.Builder result = ImmutableMap.builder(); PriorityQueue assignments = new PriorityQueue<>(); for (int partitionId = 0; partitionId < partitionCount; partitionId++) { diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java index 4c4e9d7d8812..2bee2a3c5568 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryManagerConfig.java @@ -77,8 +77,18 @@ public void testDefaults() .setMaxRemoteTaskRequestSize(DataSize.of(8, DataSize.Unit.MEGABYTE)) .setRemoteTaskRequestSizeHeadroom(DataSize.of(2, DataSize.Unit.MEGABYTE)) .setRemoteTaskGuaranteedSplitPerTask(3) - .setFaultTolerantExecutionTargetTaskInputSize(DataSize.of(4, GIGABYTE)) - .setFaultTolerantExecutionTargetTaskSplitCount(64) + .setFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod(64) + .setFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor(1.2) + .setFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin(DataSize.of(512, MEGABYTE)) + .setFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMax(DataSize.of(50, GIGABYTE)) + .setFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod(64) + .setFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor(1.2) + .setFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMin(DataSize.of(4, GIGABYTE)) + .setFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMax(DataSize.of(50, GIGABYTE)) + .setFaultTolerantExecutionHashDistributionComputeTaskTargetSize(DataSize.of(512, MEGABYTE)) + .setFaultTolerantExecutionHashDistributionWriteTaskTargetSize(DataSize.of(4, GIGABYTE)) + .setFaultTolerantExecutionHashDistributionWriteTaskTargetMaxCount(2000) + .setFaultTolerantExecutionStandardSplitSize(DataSize.of(64, MEGABYTE)) .setFaultTolerantExecutionMaxTaskSplitCount(256) .setFaultTolerantExecutionTaskDescriptorStorageMaxMemory(DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.15))) .setFaultTolerantExecutionPartitionCount(50) @@ -128,8 +138,18 @@ public void testExplicitPropertyMappings() .put("query.remote-task.max-request-size", "10MB") .put("query.remote-task.request-size-headroom", "1MB") .put("query.remote-task.guaranteed-splits-per-task", "5") - .put("fault-tolerant-execution-target-task-input-size", "222MB") - .put("fault-tolerant-execution-target-task-split-count", "3") + .put("fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-period", "11") + .put("fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-factor", "2.2") + .put("fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-min", "555MB") + .put("fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-max", "5GB") + .put("fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-period", "25") + .put("fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-factor", "2.5") + .put("fault-tolerant-execution-arbitrary-distribution-write-task-target-size-min", "6GB") + .put("fault-tolerant-execution-arbitrary-distribution-write-task-target-size-max", "10GB") + .put("fault-tolerant-execution-hash-distribution-compute-task-target-size", "1GB") + .put("fault-tolerant-execution-hash-distribution-write-task-target-size", "7GB") + .put("fault-tolerant-execution-hash-distribution-write-task-target-max-count", "5000") + .put("fault-tolerant-execution-standard-split-size", "33MB") .put("fault-tolerant-execution-max-task-split-count", "22") .put("fault-tolerant-execution-task-descriptor-storage-max-memory", "3GB") .put("fault-tolerant-execution-partition-count", "123") @@ -176,8 +196,18 @@ public void testExplicitPropertyMappings() .setMaxRemoteTaskRequestSize(DataSize.of(10, DataSize.Unit.MEGABYTE)) .setRemoteTaskRequestSizeHeadroom(DataSize.of(1, DataSize.Unit.MEGABYTE)) .setRemoteTaskGuaranteedSplitPerTask(5) - .setFaultTolerantExecutionTargetTaskInputSize(DataSize.of(222, MEGABYTE)) - .setFaultTolerantExecutionTargetTaskSplitCount(3) + .setFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod(11) + .setFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor(2.2) + .setFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMin(DataSize.of(555, MEGABYTE)) + .setFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeMax(DataSize.of(5, GIGABYTE)) + .setFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod(25) + .setFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor(2.5) + .setFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMin(DataSize.of(6, GIGABYTE)) + .setFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeMax(DataSize.of(10, GIGABYTE)) + .setFaultTolerantExecutionHashDistributionComputeTaskTargetSize(DataSize.of(1, GIGABYTE)) + .setFaultTolerantExecutionHashDistributionWriteTaskTargetSize(DataSize.of(7, GIGABYTE)) + .setFaultTolerantExecutionHashDistributionWriteTaskTargetMaxCount(5000) + .setFaultTolerantExecutionStandardSplitSize(DataSize.of(33, MEGABYTE)) .setFaultTolerantExecutionMaxTaskSplitCount(22) .setFaultTolerantExecutionTaskDescriptorStorageMaxMemory(DataSize.of(3, GIGABYTE)) .setFaultTolerantExecutionPartitionCount(123) diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestArbitraryDistributionSplitAssigner.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestArbitraryDistributionSplitAssigner.java index 8f1015ef75b2..2aa6744174b9 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestArbitraryDistributionSplitAssigner.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestArbitraryDistributionSplitAssigner.java @@ -447,6 +447,71 @@ public void fuzzTestingWithHostRequirement() } } + @Test + public void testAdaptiveTaskSizing() + { + Set partitionedSources = ImmutableSet.of(PARTITIONED_1); + List batches = ImmutableList.of( + new SplitBatch(PARTITIONED_1, ImmutableList.of(createSplit(1), createSplit(2), createSplit(3)), false), + new SplitBatch(PARTITIONED_1, ImmutableList.of(createSplit(4), createSplit(5), createSplit(6)), false), + new SplitBatch(PARTITIONED_1, ImmutableList.of(createSplit(7), createSplit(8), createSplit(9)), true)); + SplitAssigner splitAssigner = new ArbitraryDistributionSplitAssigner( + Optional.of(TEST_CATALOG_HANDLE), + partitionedSources, + ImmutableSet.of(), + 1, + 1.2, + 1, + 4, + STANDARD_SPLIT_SIZE_IN_BYTES, + 5); + SplitAssignerTester tester = new SplitAssignerTester(); + for (SplitBatch batch : batches) { + PlanNodeId planNodeId = batch.getPlanNodeId(); + List splits = batch.getSplits(); + boolean noMoreSplits = batch.isNoMoreSplits(); + tester.update(splitAssigner.assign(planNodeId, createSplitsMultimap(splits), noMoreSplits)); + tester.checkContainsSplits(planNodeId, splits, false); + } + tester.update(splitAssigner.finish()); + List taskDescriptors = tester.getTaskDescriptors().orElseThrow(); + assertThat(taskDescriptors).hasSize(4); + + TaskDescriptor taskDescriptor0 = taskDescriptors.get(0); + assertTaskDescriptor( + taskDescriptor0, + taskDescriptor0.getPartitionId(), + ImmutableListMultimap.builder() + .put(PARTITIONED_1, createSplit(1)) + .build()); + TaskDescriptor taskDescriptor1 = taskDescriptors.get(1); + assertTaskDescriptor( + taskDescriptor1, + taskDescriptor1.getPartitionId(), + ImmutableListMultimap.builder() + .put(PARTITIONED_1, createSplit(2)) + .put(PARTITIONED_1, createSplit(3)) + .build()); + TaskDescriptor taskDescriptor2 = taskDescriptors.get(2); + assertTaskDescriptor( + taskDescriptor2, + taskDescriptor2.getPartitionId(), + ImmutableListMultimap.builder() + .put(PARTITIONED_1, createSplit(4)) + .put(PARTITIONED_1, createSplit(5)) + .put(PARTITIONED_1, createSplit(6)) + .build()); + TaskDescriptor taskDescriptor3 = taskDescriptors.get(3); + assertTaskDescriptor( + taskDescriptor3, + taskDescriptor3.getPartitionId(), + ImmutableListMultimap.builder() + .put(PARTITIONED_1, createSplit(7)) + .put(PARTITIONED_1, createSplit(8)) + .put(PARTITIONED_1, createSplit(9)) + .build()); + } + private void fuzzTesting(boolean withHostRequirements) { Set partitionedSources = new HashSet<>(); @@ -667,6 +732,9 @@ private static ArbitraryDistributionSplitAssigner createSplitAssigner( Optional.of(TEST_CATALOG_HANDLE), partitionedSources, replicatedSources, + Integer.MAX_VALUE, + 1.0, + targetPartitionSizeInBytes, targetPartitionSizeInBytes, STANDARD_SPLIT_SIZE_IN_BYTES, maxTaskSplitCount); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestHashDistributionSplitAssigner.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestHashDistributionSplitAssigner.java index 1a6e31bd122e..e21eb34ad1cc 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestHashDistributionSplitAssigner.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestHashDistributionSplitAssigner.java @@ -347,6 +347,7 @@ public void testPartitionSplitting() .withMergeAllowed(true) .withExpectedTaskCount(1) .run(); + // multiple sources testAssigner() .withPartitionedSources(PARTITIONED_1, PARTITIONED_2) @@ -393,6 +394,24 @@ PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) .withMergeAllowed(true) .withExpectedTaskCount(1) .run(); + + // targetPartitionSizeInBytes re-adjustment based on taskTargetMaxCount + testAssigner() + .withPartitionedSources(PARTITIONED_1, PARTITIONED_2) + .withSplits( + new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(0, 0), createSplit(1, 0)), false), + new SplitBatch(PARTITIONED_1, createSplitMap(createSplit(2, 0), createSplit(3, 0)), true), + new SplitBatch(PARTITIONED_2, createSplitMap(createSplit(4, 0), createSplit(5, 1)), true)) + .withSplitPartitionCount(3) + .withTargetPartitionSizeInBytes(30) + .withTaskTargetMaxCount(10) + .withOutputDataSizeEstimates(ImmutableMap.of( + PARTITIONED_1, new OutputDataSizeEstimate(ImmutableLongArray.of(1000, 1, 1)), + PARTITIONED_2, new OutputDataSizeEstimate(ImmutableLongArray.of(2, 1, 1)))) + .withSplittableSources(PARTITIONED_1, PARTITIONED_2) + .withMergeAllowed(true) + .withExpectedTaskCount(12) + .run(); } @Test @@ -524,6 +543,7 @@ private static class AssignerTester private int splitPartitionCount; private Optional> partitionToNodeMap = Optional.empty(); private long targetPartitionSizeInBytes; + private int taskTargetMaxCount = Integer.MAX_VALUE; private Map outputDataSizeEstimates = ImmutableMap.of(); private Set splittableSources = ImmutableSet.of(); private boolean mergeAllowed; @@ -565,6 +585,12 @@ public AssignerTester withTargetPartitionSizeInBytes(long targetPartitionSizeInB return this; } + public AssignerTester withTaskTargetMaxCount(int taskTargetMaxCount) + { + this.taskTargetMaxCount = taskTargetMaxCount; + return this; + } + public AssignerTester withOutputDataSizeEstimates(Map outputDataSizeEstimates) { this.outputDataSizeEstimates = outputDataSizeEstimates; @@ -597,6 +623,7 @@ public void run() partitionedSources, outputDataSizeEstimates, targetPartitionSizeInBytes, + taskTargetMaxCount, splittableSources::contains, mergeAllowed); HashDistributionSplitAssigner assigner = new HashDistributionSplitAssigner( @@ -744,6 +771,7 @@ public void run() partitionedSources, outputDataSizeEstimates, targetPartitionSizeInBytes, + Integer.MAX_VALUE, splittableSources::contains, mergeAllowed); Set actualGroups = extractMappings(actual); diff --git a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java index faac87f60d02..1aa0c93df658 100644 --- a/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java +++ b/plugin/trino-exchange-filesystem/src/test/java/io/trino/plugin/exchange/filesystem/containers/MinioStorage.java @@ -93,7 +93,7 @@ public static Map getExchangeManagerProperties(MinioStorage mini .put("exchange.s3.aws-secret-key", MinioStorage.SECRET_KEY) .put("exchange.s3.region", "us-east-1") .put("exchange.s3.endpoint", "http://" + minioStorage.getMinio().getMinioApiEndpoint()) - // create more granular source handles given the fault-tolerant-execution-target-task-input-size is set to lower value for testing + // create more granular source handles given the fault-tolerant execution target task input size is set to lower value for testing .put("exchange.source-handle-target-data-size", "1MB") .buildOrThrow(); } diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index f66cfadeb701..a3ecf8e26c80 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -105,7 +105,12 @@ import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.trino.SystemSessionProperties.COLOCATED_JOIN; import static io.trino.SystemSessionProperties.ENABLE_DYNAMIC_FILTERING; -import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_TARGET_TASK_INPUT_SIZE; +import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_MAX; +import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_MIN; +import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_MAX; +import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_MIN; +import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE; +import static io.trino.SystemSessionProperties.FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_WRITE_TASK_TARGET_SIZE; import static io.trino.SystemSessionProperties.MAX_WRITER_TASKS_COUNT; import static io.trino.SystemSessionProperties.PREFERRED_WRITE_PARTITIONING_MIN_NUMBER_OF_PARTITIONS; import static io.trino.SystemSessionProperties.REDISTRIBUTE_WRITES; @@ -4162,7 +4167,12 @@ protected AbstractLongAssert testTaskScaleWriters( .setSystemProperty(TASK_SCALE_WRITERS_MAX_WRITER_COUNT, String.valueOf(taskMaxScaleWriterCount)) // Set the value higher than sf1 input data size such that fault-tolerant scheduler // shouldn't add new task and scaling only happens through the local scaling exchange. - .setSystemProperty(FAULT_TOLERANT_EXECUTION_TARGET_TASK_INPUT_SIZE, "2GB") + .setSystemProperty(FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_MIN, "2GB") + .setSystemProperty(FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE_MAX, "2GB") + .setSystemProperty(FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_MIN, "2GB") + .setSystemProperty(FAULT_TOLERANT_EXECUTION_ARBITRARY_DISTRIBUTION_WRITE_TASK_TARGET_SIZE_MAX, "2GB") + .setSystemProperty(FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_COMPUTE_TASK_TARGET_SIZE, "2GB") + .setSystemProperty(FAULT_TOLERANT_EXECUTION_HASH_DISTRIBUTION_WRITE_TASK_TARGET_SIZE, "2GB") // Set the value of orc strip size low to increase the frequency at which // physicalWrittenDataSize is updated through ConnectorPageSink#getCompletedBytes() .setCatalogSessionProperty(catalog, "orc_optimized_writer_min_stripe_size", "2MB") diff --git a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFaultTolerantExecutionTest.java b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFaultTolerantExecutionTest.java index f0a3636954ce..ee40714e3e1f 100644 --- a/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFaultTolerantExecutionTest.java +++ b/testing/trino-faulttolerant-tests/src/test/java/io/trino/faulttolerant/BaseFaultTolerantExecutionTest.java @@ -110,7 +110,12 @@ private static Session withSingleWriterPerTask(Session session) private static Session withUnlimitedTargetTaskInputSize(Session session) { return Session.builder(session) - .setSystemProperty("fault_tolerant_execution_target_task_input_size", "1PB") + .setSystemProperty("fault_tolerant_execution_arbitrary_distribution_compute_task_target_size_min", "1PB") + .setSystemProperty("fault_tolerant_execution_arbitrary_distribution_compute_task_target_size_max", "1PB") + .setSystemProperty("fault_tolerant_execution_arbitrary_distribution_write_task_target_size_min", "1PB") + .setSystemProperty("fault_tolerant_execution_arbitrary_distribution_write_task_target_size_max", "1PB") + .setSystemProperty("fault_tolerant_execution_hash_distribution_compute_task_target_size", "1PB") + .setSystemProperty("fault_tolerant_execution_hash_distribution_write_task_target_size", "1PB") .build(); } diff --git a/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java b/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java index 48c1af0e5d7e..e1ab7038b761 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/FaultTolerantExecutionConnectorTestHelper.java @@ -28,8 +28,13 @@ public static Map getExtraProperties() .put("retry-initial-delay", "50ms") .put("retry-max-delay", "100ms") .put("fault-tolerant-execution-partition-count", "5") - .put("fault-tolerant-execution-target-task-input-size", "10MB") - .put("fault-tolerant-execution-target-task-split-count", "4") + .put("fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-min", "5MB") + .put("fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-max", "10MB") + .put("fault-tolerant-execution-arbitrary-distribution-write-task-target-size-min", "10MB") + .put("fault-tolerant-execution-arbitrary-distribution-write-task-target-size-max", "20MB") + .put("fault-tolerant-execution-hash-distribution-compute-task-target-size", "5MB") + .put("fault-tolerant-execution-hash-distribution-write-task-target-size", "10MB") + .put("fault-tolerant-execution-standard-split-size", "2.5MB") // to trigger spilling .put("exchange.deduplication-buffer-size", "1kB") .put("fault-tolerant-execution-task-memory", "1GB")