From 62d142d6461b0dc6ab42cfbb740711ce90e9f8be Mon Sep 17 00:00:00 2001 From: Zebing Lin Date: Sat, 22 Apr 2023 23:13:36 -0400 Subject: [PATCH 1/4] Add lower bounds to growth factor of adaptive task sizing --- .../src/main/java/io/trino/execution/QueryManagerConfig.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java index 6806c3f21775..a6ab92e6c7de 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java @@ -711,6 +711,7 @@ public QueryManagerConfig setFaultTolerantExecutionArbitraryDistributionComputeT return this; } + @Min(1) public double getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor() { return faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor; @@ -765,6 +766,7 @@ public QueryManagerConfig setFaultTolerantExecutionArbitraryDistributionWriteTas return this; } + @Min(1) public double getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor() { return faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor; From 09db7a0522c9d3257a741e81becb846522ec8e95 Mon Sep 17 00:00:00 2001 From: Zebing Lin Date: Fri, 21 Apr 2023 14:43:42 -0400 Subject: [PATCH 2/4] Add fault-tolerant-execution-target-task-input-size to defunct configs --- .../src/main/java/io/trino/execution/QueryManagerConfig.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java index a6ab92e6c7de..e5e63b758ea4 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java @@ -44,7 +44,8 @@ "experimental.max-queued-big-queries", "query-manager.initialization-required-workers", "query-manager.initialization-timeout", - " fault-tolerant-execution-target-task-split-count", + "fault-tolerant-execution-target-task-split-count", + "fault-tolerant-execution-target-task-input-size", "query.remote-task.max-consecutive-error-count"}) public class QueryManagerConfig { From 0d6a3c7ef838b4f15125855a260f0913f85440f4 Mon Sep 17 00:00:00 2001 From: Zebing Lin Date: Fri, 21 Apr 2023 12:06:39 -0400 Subject: [PATCH 3/4] Update docs for adaptive partitioning of FTE --- .../sphinx/admin/fault-tolerant-execution.rst | 37 ++++++++++++++----- .../admin/properties-query-management.rst | 20 ++++++++++ 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst index 8dac7dc6f773..934e97f5921d 100644 --- a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst +++ b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst @@ -305,15 +305,34 @@ fault-tolerant execution: reschedule tasks in case of a failure. - (JVM heap size * 0.15) - Only ``TASK`` - * - ``fault-tolerant-execution-partition-count`` - - Number of partitions to use for distributed joins and aggregations, - similar in function to the ``query.hash-partition-count`` :doc:`query - management property `. It is not - recommended to increase this property value above the default of ``50``, - which may result in instability and poor performance. May be overridden - for the current session with the - ``fault_tolerant_execution_partition_count`` :ref:`session property - `. + * - ``fault-tolerant-execution-max-partition-count`` + - Maximum number of partitions to use for distributed joins and + aggregations, similar in function to the + ``query.max-hash-partition-count`` :doc:`query management property + `. It is not recommended to increase + this property value above the default of ``50``, which may result in + instability and poor performance. May be overridden for the current + session with the ``fault_tolerant_execution_max_partition_count`` + :ref:`session property `. + - ``50`` + - Only ``TASK`` + * - ``fault-tolerant-execution-min-partition-count`` + - Minimum number of partitions to use for distributed joins and + aggregations, similar in function to the + ``query.min-hash-partition-count`` :doc:`query management property + `. May be overridden for the current + session with the ``fault_tolerant_execution_min_partition_count`` + :ref:`session property `. + - ``4`` + - Only ``TASK`` + * - ``fault-tolerant-execution-min-partition-count-for-write`` + - Minimum number of partitions to use for distributed joins and + aggregations in write queries, similar in function to the + ``query.min-hash-partition-count-for-write`` :doc:`query management + property `. May be overridden for + the current session with the + ``fault_tolerant_execution_min_partition_count_for_write`` + :ref:`session property `. - ``50`` - Only ``TASK`` * - ``max-tasks-waiting-for-node-per-stage`` diff --git a/docs/src/main/sphinx/admin/properties-query-management.rst b/docs/src/main/sphinx/admin/properties-query-management.rst index a5d7e754e22e..b6ae52a6868d 100644 --- a/docs/src/main/sphinx/admin/properties-query-management.rst +++ b/docs/src/main/sphinx/admin/properties-query-management.rst @@ -29,6 +29,16 @@ stages of a query. You can use the following execution policies: dependencies typically prevent full processing and cause longer queue times which increases the query wall time overall. +``query.determine-partition-count-for-write-enabled`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** :ref:`prop-type-boolean` +* **Default value:** ``false`` +* **Session property:** ``determine_partition_count_for_write_enabled`` + +Enables determining the number of partitions based on amount of data read and processed by the +query for write queries. + ``query.max-hash-partition-count`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -49,6 +59,16 @@ joins, aggregations, partitioned window functions and others. The minimum number of partitions to use for processing distributed operations, such as joins, aggregations, partitioned window functions and others. +``query.min-hash-partition-count-for-write`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** :ref:`prop-type-integer` +* **Default value:** ``50`` +* **Session property:** ``min_hash_partition_count_for_writre`` + +The minimum number of partitions to use for processing distributed operations in write queries, +such as joins, aggregations, partitioned window functions and others. + ``query.max-writer-tasks-count`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ From 51d0dfe22626237e2b3ad972409b6550ff88ed6e Mon Sep 17 00:00:00 2001 From: Zebing Lin Date: Fri, 21 Apr 2023 12:22:12 -0400 Subject: [PATCH 4/4] Update docs for adaptive task sizing of FTE --- .../trino/execution/QueryManagerConfig.java | 4 +- .../sphinx/admin/fault-tolerant-execution.rst | 74 ++++++++++++++----- 2 files changed, 58 insertions(+), 20 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java index e5e63b758ea4..52f901d0c71d 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java @@ -705,7 +705,7 @@ public int getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGr } @Config("fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-period") - @ConfigDescription("The number of tasks we create for given non-writer stage of arbitrary distribution before we increase task size") + @ConfigDescription("The number of tasks created for any given non-writer stage of arbitrary distribution before task size is increased") public QueryManagerConfig setFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod(int faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod) { this.faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod = faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod; @@ -760,7 +760,7 @@ public int getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrow } @Config("fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-period") - @ConfigDescription("The number of tasks we create for given writer stage of arbitrary distribution before we increase task size") + @ConfigDescription("The number of tasks created for any given writer stage of arbitrary distribution before task size is increased") public QueryManagerConfig setFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod(int faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod) { this.faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod = faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod; diff --git a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst index 934e97f5921d..c7934a452ecb 100644 --- a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst +++ b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst @@ -227,27 +227,17 @@ properties only apply to a ``TASK`` retry policy. * - Property name - Description - Default value - * - ``fault-tolerant-execution-target-task-input-size`` - - Target size in bytes of all task inputs for a single fault-tolerant task. - Applies to tasks that read input from spooled data written by other - tasks. + * - ``fault-tolerant-execution-standard-split-size`` + - Standard :ref:`split ` size processed by tasks that + read data from source tables. Value is interpreted with split weight + taken into account. If the weight of splits produced by a catalog denotes + that they are lighter or heavier than "standard" split, then the number + of splits processed by a single task is adjusted accordingly. May be overridden for the current session with the - ``fault_tolerant_execution_target_task_input_size`` + ``fault_tolerant_execution_standard_split_size`` :ref:`session property `. - - ``4GB`` - * - ``fault-tolerant-execution-target-task-split-count`` - - Target number of standard :ref:`splits ` processed - by a single task that reads data from source tables. Value is interpreted - with split weight taken into account. If the weight of splits produced by - a catalog denotes that they are lighter or heavier than "standard" split, - then the number of splits processed by single task is adjusted - accordingly. - - May be overridden for the current session with the - ``fault_tolerant_execution_target_task_split_count`` - :ref:`session property `. - - ``64`` + - ``64MB`` * - ``fault-tolerant-execution-max-task-split-count`` - Maximum number of :ref:`splits ` processed by a single task. This value is not split weight-adjusted and serves as @@ -258,6 +248,54 @@ properties only apply to a ``TASK`` retry policy. ``fault_tolerant_execution_max_task_split_count`` :ref:`session property `. - ``256`` + * - ``fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-period`` + - The number of tasks created for any given non-writer stage of arbitrary + distribution before task size is increased. + - ``64`` + * - ``fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-factor`` + - Growth factor for adaptive sizing of non-writer tasks of arbitrary + distribution for fault-tolerant execution. Lower bound is 1.0. For every + task size increase, new task target size is old task target size + multiplied by this growth factor. + - ``1.2`` + * - ``fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-min`` + - Initial/minimum target input size for non-writer tasks of arbitrary + distribution of fault-tolerant execution. + - ``512MB`` + * - ``fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-max`` + - Maximum target input size for each non-writer task of arbitrary + distribution of fault-tolerant execution. + - ``50GB`` + * - ``fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-period`` + - The number of tasks created for any given writer stage of arbitrary + distribution before task size is increased. + - ``64`` + * - ``fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-factor`` + - Growth factor for adaptive sizing of writer tasks of arbitrary + distribution for fault-tolerant execution. Lower bound is 1.0. For every + task size increase, new task target size is old task target size + multiplied by this growth factor. + - ``1.2`` + * - ``fault-tolerant-execution-arbitrary-distribution-write-task-target-size-min`` + - Initial/minimum target input size for writer tasks of arbitrary + distribution of fault-tolerant execution. + - ``4GB`` + * - ``fault-tolerant-execution-arbitrary-distribution-write-task-target-size-max`` + - Maximum target input size for writer tasks of arbitrary distribution + of fault-tolerant execution. + - ``50GB`` + * - ``fault-tolerant-execution-hash-distribution-compute-task-target-size`` + - Target input size for non-writer tasks of hash distribution of + fault-tolerant execution. + - ``512MB`` + * - ``fault-tolerant-execution-hash-distribution-write-task-target-size`` + - Target input size of writer tasks of hash distribution of fault-tolerant + execution. + - ``4GB`` + * - ``fault-tolerant-execution-hash-distribution-write-task-target-max-count`` + - Soft upper bound on number of writer tasks in a stage of hash + distribution of fault-tolerant execution. + - ``2000`` Node allocation ^^^^^^^^^^^^^^^