Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,18 @@ public class TaskManagerConfig
private Duration interruptStuckSplitTasksDetectionInterval = new Duration(2, TimeUnit.MINUTES);

private boolean scaleWritersEnabled = true;
// Set the value of default max writer count to 2 * max(the number of processors, 32). We can do this
// Set the value of default max writer count to the number of processors and cap it to 32. We can do this
// because preferred write partitioning is always enabled for local exchange thus partitioned inserts will never
// use this property. Hence, there is no risk in terms of more numbers of physical writers which can cause high
// resource utilization.
private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32) * 2;
private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount(), 32);
private int writerCount = 1;
// Default value of partitioned task writer count should be above 1, otherwise it can create a plan
// with a single gather exchange node on the coordinator due to a single available processor. Whereas,
// on the worker nodes due to more available processors, the default value could be above 1. Therefore,
// it can cause error due to config mismatch during execution. Additionally, cap it to 32 in order to
// avoid small pages produced by local partitioning exchanges.
private int partitionedWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32) * 2;
private int partitionedWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32);
// Default value of task concurrency should be above 1, otherwise it can create a plan with a single gather
// exchange node on the coordinator due to a single available processor. Whereas, on the worker nodes due to
// more available processors, the default value could be above 1. Therefore, it can cause error due to config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
public class TestTaskManagerConfig
{
private static final int DEFAULT_PROCESSOR_COUNT = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32);
private static final int DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT = min(getAvailablePhysicalProcessorCount(), 32) * 2;
private static final int DEFAULT_PARTITIONED_WRITER_COUNT = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 2), 32) * 2;
private static final int DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT = min(getAvailablePhysicalProcessorCount(), 32);

@Test
public void testDefaults()
Expand Down Expand Up @@ -66,7 +65,7 @@ public void testDefaults()
.setScaleWritersEnabled(true)
.setScaleWritersMaxWriterCount(DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT)
.setWriterCount(1)
.setPartitionedWriterCount(DEFAULT_PARTITIONED_WRITER_COUNT)
.setPartitionedWriterCount(DEFAULT_PROCESSOR_COUNT)
.setTaskConcurrency(DEFAULT_PROCESSOR_COUNT)
.setHttpResponseThreads(100)
.setHttpTimeoutThreads(3)
Expand All @@ -85,7 +84,6 @@ public void testExplicitPropertyMappings()
{
int processorCount = DEFAULT_PROCESSOR_COUNT == 32 ? 16 : 32;
int maxWriterCount = DEFAULT_SCALE_WRITERS_MAX_WRITER_COUNT == 32 ? 16 : 32;
int partitionedWriterCount = DEFAULT_PARTITIONED_WRITER_COUNT == 64 ? 32 : 64;
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("task.initial-splits-per-node", "1")
.put("task.split-concurrency-adjustment-interval", "1s")
Expand All @@ -112,7 +110,7 @@ public void testExplicitPropertyMappings()
.put("task.scale-writers.enabled", "false")
.put("task.scale-writers.max-writer-count", Integer.toString(maxWriterCount))
.put("task.writer-count", "4")
.put("task.partitioned-writer-count", Integer.toString(partitionedWriterCount))
.put("task.partitioned-writer-count", Integer.toString(processorCount))
.put("task.concurrency", Integer.toString(processorCount))
.put("task.http-response-threads", "4")
.put("task.http-timeout-threads", "10")
Expand Down Expand Up @@ -152,7 +150,7 @@ public void testExplicitPropertyMappings()
.setScaleWritersEnabled(false)
.setScaleWritersMaxWriterCount(maxWriterCount)
.setWriterCount(4)
.setPartitionedWriterCount(partitionedWriterCount)
.setPartitionedWriterCount(processorCount)
.setTaskConcurrency(processorCount)
.setHttpResponseThreads(4)
.setHttpTimeoutThreads(10)
Expand Down