diff --git a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java index 8a5770a92811..7e30b97a6273 100644 --- a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java @@ -32,6 +32,7 @@ import static io.trino.util.MachineInfo.getAvailablePhysicalProcessorCount; import static it.unimi.dsi.fastutil.HashCommon.nextPowerOfTwo; +import static java.lang.Math.max; import static java.lang.Math.min; @DefunctConfig({ @@ -81,10 +82,18 @@ public class TaskManagerConfig // more resources, hence potentially affect the other concurrent queries in the cluster. private int scaleWritersMaxWriterCount = 8; private int writerCount = 1; - // cap partitioned task writer count to 32 in order to avoid small pages produced by local partitioning exchanges - private int partitionedWriterCount = min(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 32); - // cap task concurrency to 32 in order to avoid small pages produced by local partitioning exchanges - private int taskConcurrency = min(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 32); + // Default value of partitioned task writer count should be above 1, otherwise it can create a plan + // with a single gather exchange node on the coordinator due to a single available processor. Whereas, + // on the worker nodes due to more available processors, the default value could be above 1. Therefore, + // it can cause error due to config mismatch during execution. Additionally, cap it to 32 in order to + // avoid small pages produced by local partitioning exchanges. + private int partitionedWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32); + // Default value of task concurrency should be above 1, otherwise it can create a plan with a single gather + // exchange node on the coordinator due to a single available processor. Whereas, on the worker nodes due to + // more available processors, the default value could be above 1. Therefore, it can cause error due to config + // mismatch during execution. Additionally, cap it to 32 in order to avoid small pages produced by local + // partitioning exchanges. + private int taskConcurrency = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32); private int httpResponseThreads = 100; private int httpTimeoutThreads = 3; diff --git a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java index 5f9e61d3db60..e9c4d4494ca2 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java @@ -28,11 +28,12 @@ import static io.airlift.units.DataSize.Unit; import static io.trino.util.MachineInfo.getAvailablePhysicalProcessorCount; import static it.unimi.dsi.fastutil.HashCommon.nextPowerOfTwo; +import static java.lang.Math.max; import static java.lang.Math.min; public class TestTaskManagerConfig { - private static final int DEFAULT_PROCESSOR_COUNT = min(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 32); + private static final int DEFAULT_PROCESSOR_COUNT = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32); @Test public void testDefaults() diff --git a/docs/src/main/sphinx/admin/properties-task.rst b/docs/src/main/sphinx/admin/properties-task.rst index ef2d48c9bff7..8a737d075969 100644 --- a/docs/src/main/sphinx/admin/properties-task.rst +++ b/docs/src/main/sphinx/admin/properties-task.rst @@ -7,7 +7,7 @@ Task properties * **Type:** :ref:`prop-type-integer` * **Restrictions:** Must be a power of two -* **Default value:** min(number of physical CPUs of the node, 32) +* **Default value:** min(max(number of physical CPUs of the node, 2), 32) Default local concurrency for parallel operators, such as joins and aggregations. This value should be adjusted up or down based on the query concurrency and worker @@ -147,7 +147,7 @@ This can also be specified on a per-query basis using the ``task_writer_count`` * **Type:** :ref:`prop-type-integer` * **Restrictions:** Must be a power of two -* **Default value:** min(number of physical CPUs of the node, 32) +* **Default value:** min(max(number of physical CPUs of the node, 2), 32) The number of concurrent writer threads per worker per query when :ref:`preferred partitioning ` is used. Increasing this value may