diff --git a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java index 6806c3f21775..52f901d0c71d 100644 --- a/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/QueryManagerConfig.java @@ -44,7 +44,8 @@ "experimental.max-queued-big-queries", "query-manager.initialization-required-workers", "query-manager.initialization-timeout", - " fault-tolerant-execution-target-task-split-count", + "fault-tolerant-execution-target-task-split-count", + "fault-tolerant-execution-target-task-input-size", "query.remote-task.max-consecutive-error-count"}) public class QueryManagerConfig { @@ -704,13 +705,14 @@ public int getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGr } @Config("fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-period") - @ConfigDescription("The number of tasks we create for given non-writer stage of arbitrary distribution before we increase task size") + @ConfigDescription("The number of tasks created for any given non-writer stage of arbitrary distribution before task size is increased") public QueryManagerConfig setFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod(int faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod) { this.faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod = faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthPeriod; return this; } + @Min(1) public double getFaultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor() { return faultTolerantExecutionArbitraryDistributionComputeTaskTargetSizeGrowthFactor; @@ -758,13 +760,14 @@ public int getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrow } @Config("fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-period") - @ConfigDescription("The number of tasks we create for given writer stage of arbitrary distribution before we increase task size") + @ConfigDescription("The number of tasks created for any given writer stage of arbitrary distribution before task size is increased") public QueryManagerConfig setFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod(int faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod) { this.faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod = faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthPeriod; return this; } + @Min(1) public double getFaultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor() { return faultTolerantExecutionArbitraryDistributionWriteTaskTargetSizeGrowthFactor; diff --git a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst index 8dac7dc6f773..c7934a452ecb 100644 --- a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst +++ b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst @@ -227,27 +227,17 @@ properties only apply to a ``TASK`` retry policy. * - Property name - Description - Default value - * - ``fault-tolerant-execution-target-task-input-size`` - - Target size in bytes of all task inputs for a single fault-tolerant task. - Applies to tasks that read input from spooled data written by other - tasks. + * - ``fault-tolerant-execution-standard-split-size`` + - Standard :ref:`split ` size processed by tasks that + read data from source tables. Value is interpreted with split weight + taken into account. If the weight of splits produced by a catalog denotes + that they are lighter or heavier than "standard" split, then the number + of splits processed by a single task is adjusted accordingly. May be overridden for the current session with the - ``fault_tolerant_execution_target_task_input_size`` + ``fault_tolerant_execution_standard_split_size`` :ref:`session property `. - - ``4GB`` - * - ``fault-tolerant-execution-target-task-split-count`` - - Target number of standard :ref:`splits ` processed - by a single task that reads data from source tables. Value is interpreted - with split weight taken into account. If the weight of splits produced by - a catalog denotes that they are lighter or heavier than "standard" split, - then the number of splits processed by single task is adjusted - accordingly. - - May be overridden for the current session with the - ``fault_tolerant_execution_target_task_split_count`` - :ref:`session property `. - - ``64`` + - ``64MB`` * - ``fault-tolerant-execution-max-task-split-count`` - Maximum number of :ref:`splits ` processed by a single task. This value is not split weight-adjusted and serves as @@ -258,6 +248,54 @@ properties only apply to a ``TASK`` retry policy. ``fault_tolerant_execution_max_task_split_count`` :ref:`session property `. - ``256`` + * - ``fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-period`` + - The number of tasks created for any given non-writer stage of arbitrary + distribution before task size is increased. + - ``64`` + * - ``fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-growth-factor`` + - Growth factor for adaptive sizing of non-writer tasks of arbitrary + distribution for fault-tolerant execution. Lower bound is 1.0. For every + task size increase, new task target size is old task target size + multiplied by this growth factor. + - ``1.2`` + * - ``fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-min`` + - Initial/minimum target input size for non-writer tasks of arbitrary + distribution of fault-tolerant execution. + - ``512MB`` + * - ``fault-tolerant-execution-arbitrary-distribution-compute-task-target-size-max`` + - Maximum target input size for each non-writer task of arbitrary + distribution of fault-tolerant execution. + - ``50GB`` + * - ``fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-period`` + - The number of tasks created for any given writer stage of arbitrary + distribution before task size is increased. + - ``64`` + * - ``fault-tolerant-execution-arbitrary-distribution-write-task-target-size-growth-factor`` + - Growth factor for adaptive sizing of writer tasks of arbitrary + distribution for fault-tolerant execution. Lower bound is 1.0. For every + task size increase, new task target size is old task target size + multiplied by this growth factor. + - ``1.2`` + * - ``fault-tolerant-execution-arbitrary-distribution-write-task-target-size-min`` + - Initial/minimum target input size for writer tasks of arbitrary + distribution of fault-tolerant execution. + - ``4GB`` + * - ``fault-tolerant-execution-arbitrary-distribution-write-task-target-size-max`` + - Maximum target input size for writer tasks of arbitrary distribution + of fault-tolerant execution. + - ``50GB`` + * - ``fault-tolerant-execution-hash-distribution-compute-task-target-size`` + - Target input size for non-writer tasks of hash distribution of + fault-tolerant execution. + - ``512MB`` + * - ``fault-tolerant-execution-hash-distribution-write-task-target-size`` + - Target input size of writer tasks of hash distribution of fault-tolerant + execution. + - ``4GB`` + * - ``fault-tolerant-execution-hash-distribution-write-task-target-max-count`` + - Soft upper bound on number of writer tasks in a stage of hash + distribution of fault-tolerant execution. + - ``2000`` Node allocation ^^^^^^^^^^^^^^^ @@ -305,15 +343,34 @@ fault-tolerant execution: reschedule tasks in case of a failure. - (JVM heap size * 0.15) - Only ``TASK`` - * - ``fault-tolerant-execution-partition-count`` - - Number of partitions to use for distributed joins and aggregations, - similar in function to the ``query.hash-partition-count`` :doc:`query - management property `. It is not - recommended to increase this property value above the default of ``50``, - which may result in instability and poor performance. May be overridden - for the current session with the - ``fault_tolerant_execution_partition_count`` :ref:`session property - `. + * - ``fault-tolerant-execution-max-partition-count`` + - Maximum number of partitions to use for distributed joins and + aggregations, similar in function to the + ``query.max-hash-partition-count`` :doc:`query management property + `. It is not recommended to increase + this property value above the default of ``50``, which may result in + instability and poor performance. May be overridden for the current + session with the ``fault_tolerant_execution_max_partition_count`` + :ref:`session property `. + - ``50`` + - Only ``TASK`` + * - ``fault-tolerant-execution-min-partition-count`` + - Minimum number of partitions to use for distributed joins and + aggregations, similar in function to the + ``query.min-hash-partition-count`` :doc:`query management property + `. May be overridden for the current + session with the ``fault_tolerant_execution_min_partition_count`` + :ref:`session property `. + - ``4`` + - Only ``TASK`` + * - ``fault-tolerant-execution-min-partition-count-for-write`` + - Minimum number of partitions to use for distributed joins and + aggregations in write queries, similar in function to the + ``query.min-hash-partition-count-for-write`` :doc:`query management + property `. May be overridden for + the current session with the + ``fault_tolerant_execution_min_partition_count_for_write`` + :ref:`session property `. - ``50`` - Only ``TASK`` * - ``max-tasks-waiting-for-node-per-stage`` diff --git a/docs/src/main/sphinx/admin/properties-query-management.rst b/docs/src/main/sphinx/admin/properties-query-management.rst index a5d7e754e22e..b6ae52a6868d 100644 --- a/docs/src/main/sphinx/admin/properties-query-management.rst +++ b/docs/src/main/sphinx/admin/properties-query-management.rst @@ -29,6 +29,16 @@ stages of a query. You can use the following execution policies: dependencies typically prevent full processing and cause longer queue times which increases the query wall time overall. +``query.determine-partition-count-for-write-enabled`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** :ref:`prop-type-boolean` +* **Default value:** ``false`` +* **Session property:** ``determine_partition_count_for_write_enabled`` + +Enables determining the number of partitions based on amount of data read and processed by the +query for write queries. + ``query.max-hash-partition-count`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -49,6 +59,16 @@ joins, aggregations, partitioned window functions and others. The minimum number of partitions to use for processing distributed operations, such as joins, aggregations, partitioned window functions and others. +``query.min-hash-partition-count-for-write`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** :ref:`prop-type-integer` +* **Default value:** ``50`` +* **Session property:** ``min_hash_partition_count_for_writre`` + +The minimum number of partitions to use for processing distributed operations in write queries, +such as joins, aggregations, partitioned window functions and others. + ``query.max-writer-tasks-count`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^