Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

Comment thread
gaurav8297 marked this conversation as resolved.
Outdated
import static io.trino.util.MachineInfo.getAvailablePhysicalProcessorCount;
import static it.unimi.dsi.fastutil.HashCommon.nextPowerOfTwo;
import static java.lang.Math.max;
import static java.lang.Math.min;

@DefunctConfig({
Expand Down Expand Up @@ -81,10 +82,18 @@ public class TaskManagerConfig
// more resources, hence potentially affect the other concurrent queries in the cluster.
private int scaleWritersMaxWriterCount = 8;
private int writerCount = 1;
// cap partitioned task writer count to 32 in order to avoid small pages produced by local partitioning exchanges
private int partitionedWriterCount = min(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 32);
// cap task concurrency to 32 in order to avoid small pages produced by local partitioning exchanges
private int taskConcurrency = min(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 32);
// Default value of partitioned task writer count should be above 1, otherwise it can create a plan
// with a single gather exchange node on the coordinator due to a single available processor. Whereas,
// on the worker nodes due to more available processors, the default value could be above 1. Therefore,
// it can cause error due to config mismatch during execution. Additionally, cap it to 32 in order to
// avoid small pages produced by local partitioning exchanges.
private int partitionedWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32);
// Default value of task concurrency should be above 1, otherwise it can create a plan with a single gather
// exchange node on the coordinator due to a single available processor. Whereas, on the worker nodes due to
// more available processors, the default value could be above 1. Therefore, it can cause error due to config
// mismatch during execution. Additionally, cap it to 32 in order to avoid small pages produced by local
// partitioning exchanges.
private int taskConcurrency = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32);
private int httpResponseThreads = 100;
private int httpTimeoutThreads = 3;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
import static io.airlift.units.DataSize.Unit;
import static io.trino.util.MachineInfo.getAvailablePhysicalProcessorCount;
import static it.unimi.dsi.fastutil.HashCommon.nextPowerOfTwo;
import static java.lang.Math.max;
import static java.lang.Math.min;

public class TestTaskManagerConfig
{
private static final int DEFAULT_PROCESSOR_COUNT = min(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 32);
private static final int DEFAULT_PROCESSOR_COUNT = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32);

@Test
public void testDefaults()
Expand Down
4 changes: 2 additions & 2 deletions docs/src/main/sphinx/admin/properties-task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Task properties

* **Type:** :ref:`prop-type-integer`
* **Restrictions:** Must be a power of two
* **Default value:** min(number of physical CPUs of the node, 32)
* **Default value:** min(max(number of physical CPUs of the node, 2), 32)

Default local concurrency for parallel operators, such as joins and aggregations.
This value should be adjusted up or down based on the query concurrency and worker
Expand Down Expand Up @@ -147,7 +147,7 @@ This can also be specified on a per-query basis using the ``task_writer_count``

* **Type:** :ref:`prop-type-integer`
* **Restrictions:** Must be a power of two
* **Default value:** min(number of physical CPUs of the node, 32)
* **Default value:** min(max(number of physical CPUs of the node, 2), 32)

The number of concurrent writer threads per worker per query when
:ref:`preferred partitioning <preferred-write-partitioning>` is used. Increasing this value may
Expand Down