diff --git a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java index 1738003830ed..ddd2fb50aa09 100644 --- a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java @@ -78,18 +78,18 @@ public class TaskManagerConfig private Duration interruptStuckSplitTasksDetectionInterval = new Duration(2, TimeUnit.MINUTES); private boolean scaleWritersEnabled = true; - // Set the value of default max writer count to 2 * max(the number of processors, 32). We can do this + // Set the value of default max writer count to the number of processors and cap it to 32. We can do this // because preferred write partitioning is always enabled for local exchange thus partitioned inserts will never // use this property. Hence, there is no risk in terms of more numbers of physical writers which can cause high // resource utilization. - private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32) * 2; + private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32); private int writerCount = 1; // Default value of partitioned task writer count should be above 1, otherwise it can create a plan // with a single gather exchange node on the coordinator due to a single available processor. Whereas, // on the worker nodes due to more available processors, the default value could be above 1. Therefore, // it can cause error due to config mismatch during execution. Additionally, cap it to 32 in order to // avoid small pages produced by local partitioning exchanges. - private int partitionedWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32) * 2; + private int partitionedWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32); // Default value of task concurrency should be above 1, otherwise it can create a plan with a single gather // exchange node on the coordinator due to a single available processor. Whereas, on the worker nodes due to // more available processors, the default value could be above 1. Therefore, it can cause error due to config diff --git a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java index 97628cd9b274..ed74d9203560 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java @@ -34,8 +34,7 @@ public class TestTaskManagerConfig { private static final int DEFAULT_PROCESSOR_COUNT = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32); - private static final int DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT = min(getAvailablePhysicalProcessorCount(), 32) * 2; - private static final int DEFAULT_PARTITIONED_WRITER_COUNT = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32) * 2; + private static final int DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT = min(getAvailablePhysicalProcessorCount(), 32); @Test public void testDefaults() @@ -66,7 +65,7 @@ public void testDefaults() .setScaleWritersEnabled(true) .setScaleWritersMaxWriterCount(DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT) .setWriterCount(1) - .setPartitionedWriterCount(DEFAULT_PARTITIONED_WRITER_COUNT) + .setPartitionedWriterCount(DEFAULT_PROCESSOR_COUNT) .setTaskConcurrency(DEFAULT_PROCESSOR_COUNT) .setHttpResponseThreads(100) .setHttpTimeoutThreads(3) @@ -85,7 +84,6 @@ public void testExplicitPropertyMappings() { int processorCount = DEFAULT_PROCESSOR_COUNT == 32 ? 16 : 32; int maxWriterCount = DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT == 32 ? 16 : 32; - int partitionedWriterCount = DEFAULT_PARTITIONED_WRITER_COUNT == 64 ? 32 : 64; Map properties = ImmutableMap.builder() .put("task.initial-splits-per-node", "1") .put("task.split-concurrency-adjustment-interval", "1s") @@ -112,7 +110,7 @@ public void testExplicitPropertyMappings() .put("task.scale-writers.enabled", "false") .put("task.scale-writers.max-writer-count", Integer.toString(maxWriterCount)) .put("task.writer-count", "4") - .put("task.partitioned-writer-count", Integer.toString(partitionedWriterCount)) + .put("task.partitioned-writer-count", Integer.toString(processorCount)) .put("task.concurrency", Integer.toString(processorCount)) .put("task.http-response-threads", "4") .put("task.http-timeout-threads", "10") @@ -152,7 +150,7 @@ public void testExplicitPropertyMappings() .setScaleWritersEnabled(false) .setScaleWritersMaxWriterCount(maxWriterCount) .setWriterCount(4) - .setPartitionedWriterCount(partitionedWriterCount) + .setPartitionedWriterCount(processorCount) .setTaskConcurrency(processorCount) .setHttpResponseThreads(4) .setHttpTimeoutThreads(10)