Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public class TaskManagerConfig
private boolean shareIndexLoading;
private int maxWorkerThreads = Runtime.getRuntime().availableProcessors() * 2;
private Integer minDrivers;
private Integer initialSplitsPerNode;
private int initialSplitsPerNode = Runtime.getRuntime().availableProcessors() * 4;
private int minDriversPerTask = 3;
private int maxDriversPerTask = Integer.MAX_VALUE;
private Duration splitConcurrencyAdjustmentInterval = new Duration(100, TimeUnit.MILLISECONDS);
private Duration splitConcurrencyAdjustmentInterval = new Duration(1, TimeUnit.SECONDS);
Comment thread
Dith3r marked this conversation as resolved.
Outdated

private DataSize sinkMaxBufferSize = DataSize.of(32, Unit.MEGABYTE);
private DataSize sinkMaxBroadcastBufferSize = DataSize.of(200, Unit.MEGABYTE);
Expand Down Expand Up @@ -294,9 +294,6 @@ public TaskManagerConfig setMaxWorkerThreads(int maxWorkerThreads)
@Min(1)
public int getInitialSplitsPerNode()
{
if (initialSplitsPerNode == null) {
return maxWorkerThreads;
}
return initialSplitsPerNode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public enum SplitsBalancingPolicy

private int minCandidates = 10;
private boolean includeCoordinator = true;
private int maxSplitsPerNode = 100;
private int minPendingSplitsPerTask = 10;
private int maxSplitsPerNode = 256;
private int minPendingSplitsPerTask = 16;
private int maxAdjustedPendingSplitsWeightPerTask = 2000;
private NodeSchedulerPolicy nodeSchedulerPolicy = NodeSchedulerPolicy.UNIFORM;
private boolean optimizedLocalScheduling = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public void testDefaults()
assertRecordedDefaults(recordDefaults(NodeSchedulerConfig.class)
.setNodeSchedulerPolicy(UNIFORM.name())
.setMinCandidates(10)
.setMaxSplitsPerNode(100)
.setMinPendingSplitsPerTask(10)
.setMaxSplitsPerNode(256)
.setMinPendingSplitsPerTask(16)
.setMaxAdjustedPendingSplitsWeightPerTask(2000)
.setMaxUnacknowledgedSplitsPerTask(2000)
.setIncludeCoordinator(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public void testDefaults()
{
assertRecordedDefaults(recordDefaults(TaskManagerConfig.class)
.setThreadPerDriverSchedulerEnabled(false)
.setInitialSplitsPerNode(Runtime.getRuntime().availableProcessors() * 2)
.setSplitConcurrencyAdjustmentInterval(new Duration(100, TimeUnit.MILLISECONDS))
.setInitialSplitsPerNode(Runtime.getRuntime().availableProcessors() * 4)
.setSplitConcurrencyAdjustmentInterval(new Duration(1, TimeUnit.SECONDS))
.setStatusRefreshMaxWait(new Duration(1, TimeUnit.SECONDS))
.setInfoUpdateInterval(new Duration(3, TimeUnit.SECONDS))
.setTaskTerminationTimeout(new Duration(1, TimeUnit.MINUTES))
Expand Down Expand Up @@ -87,7 +87,7 @@ public void testExplicitPropertyMappings()
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("experimental.thread-per-driver-scheduler-enabled", "true")
.put("task.initial-splits-per-node", "1")
.put("task.split-concurrency-adjustment-interval", "1s")
.put("task.split-concurrency-adjustment-interval", "3s")
.put("task.status-refresh-max-wait", "2s")
.put("task.info-update-interval", "2s")
.put("task.termination-timeout", "15s")
Expand Down Expand Up @@ -127,7 +127,7 @@ public void testExplicitPropertyMappings()
TaskManagerConfig expected = new TaskManagerConfig()
.setThreadPerDriverSchedulerEnabled(true)
.setInitialSplitsPerNode(1)
.setSplitConcurrencyAdjustmentInterval(new Duration(1, TimeUnit.SECONDS))
.setSplitConcurrencyAdjustmentInterval(new Duration(3, TimeUnit.SECONDS))
.setStatusRefreshMaxWait(new Duration(2, TimeUnit.SECONDS))
.setInfoUpdateInterval(new Duration(2, TimeUnit.SECONDS))
.setTaskTerminationTimeout(new Duration(15, TimeUnit.SECONDS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,9 @@ public void testNewTaskScheduledWhenChildStageBufferIsUnderutilized()
StageScheduler scheduler = newSourcePartitionedSchedulerAsStageScheduler(
stage,
TABLE_SCAN_NODE_ID,
new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createFixedSplitSource(500, TestingSplit::createRemoteSplit)),
new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createFixedSplitSource(4 * 300, TestingSplit::createRemoteSplit)),
new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(session, Optional.of(TEST_CATALOG_HANDLE)), stage::getAllTasks),
500,
4 * 300,
new DynamicFilterService(metadata, functionManager, typeOperators, new DynamicFilterConfig()),
new TableExecuteContextManager(),
() -> false);
Expand All @@ -518,18 +518,18 @@ public void testNewTaskScheduledWhenChildStageBufferIsUnderutilized()
ScheduleResult scheduleResult = scheduler.schedule();
assertEquals(scheduleResult.getBlockedReason().get(), SPLIT_QUEUES_FULL);
assertEquals(scheduleResult.getNewTasks().size(), 3);
assertEquals(scheduleResult.getSplitsScheduled(), 300);
assertEquals(scheduleResult.getSplitsScheduled(), 3 * 256);
for (RemoteTask remoteTask : scheduleResult.getNewTasks()) {
PartitionedSplitsInfo splitsInfo = remoteTask.getPartitionedSplitsInfo();
assertEquals(splitsInfo.getCount(), 100);
assertEquals(splitsInfo.getCount(), 256);
}

// new node added - the pending splits should go to it since the child tasks are not blocked
nodeManager.addNodes(new InternalNode("other4", URI.create("http://127.0.0.4:14"), NodeVersion.UNKNOWN, false));
scheduleResult = scheduler.schedule();
assertEquals(scheduleResult.getNewTasks().size(), 1);
assertEquals(scheduleResult.getBlockedReason().get(), SPLIT_QUEUES_FULL); // split queue is full but still the source task creation isn't blocked
assertEquals(scheduleResult.getSplitsScheduled(), 100);
assertEquals(scheduleResult.getSplitsScheduled(), 256);
}

@Test
Expand All @@ -550,9 +550,9 @@ public void testNoNewTaskScheduledWhenChildStageBufferIsOverutilized()
StageScheduler scheduler = newSourcePartitionedSchedulerAsStageScheduler(
stage,
TABLE_SCAN_NODE_ID,
new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createFixedSplitSource(400, TestingSplit::createRemoteSplit)),
new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createFixedSplitSource(3 * 300, TestingSplit::createRemoteSplit)),
new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(session, Optional.of(TEST_CATALOG_HANDLE)), stage::getAllTasks),
400,
3 * 300,
new DynamicFilterService(metadata, functionManager, typeOperators, new DynamicFilterConfig()),
new TableExecuteContextManager(),
() -> true);
Expand All @@ -561,10 +561,10 @@ public void testNoNewTaskScheduledWhenChildStageBufferIsOverutilized()
ScheduleResult scheduleResult = scheduler.schedule();
assertEquals(scheduleResult.getBlockedReason().get(), SPLIT_QUEUES_FULL);
assertEquals(scheduleResult.getNewTasks().size(), 3);
assertEquals(scheduleResult.getSplitsScheduled(), 300);
assertEquals(scheduleResult.getSplitsScheduled(), 768);
for (RemoteTask remoteTask : scheduleResult.getNewTasks()) {
PartitionedSplitsInfo splitsInfo = remoteTask.getPartitionedSplitsInfo();
assertEquals(splitsInfo.getCount(), 100);
assertEquals(splitsInfo.getCount(), 256);
}

// new node added but 1 child's output buffer is overutilized - so lockdown the tasks
Expand Down
4 changes: 2 additions & 2 deletions docs/src/main/sphinx/admin/properties-node-scheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ managing, and monitoring query execution.
## `node-scheduler.max-splits-per-node`

- **Type:** {ref}`prop-type-integer`
- **Default value:** `100`
- **Default value:** `256`

The target value for the total number of splits that can be running for
each worker node, assuming all splits have the standard split weight.
Expand All @@ -40,7 +40,7 @@ not higher.
## `node-scheduler.min-pending-splits-per-task`

- **Type:** {ref}`prop-type-integer`
- **Default value:** `10`
- **Default value:** `16`

The minimum number of outstanding splits with the standard split weight guaranteed to be scheduled on a node (even when the node
is already at the limit for total number of splits) for a single task given the task has remaining splits to process.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class HiveConfig
private DataSize maxSplitSize = DataSize.of(64, MEGABYTE);
private int maxPartitionsPerScan = 1_000_000;
private int maxPartitionsForEagerLoad = 100_000;
private int maxOutstandingSplits = 1_000;
private int maxOutstandingSplits = 3_000;
private DataSize maxOutstandingSplitsSize = DataSize.of(256, MEGABYTE);
private int maxSplitIteratorThreads = 1_000;
private int minPartitionBatchSize = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testDefaults()
.setMaxSplitSize(DataSize.of(64, Unit.MEGABYTE))
.setMaxPartitionsPerScan(1_000_000)
.setMaxPartitionsForEagerLoad(100_000)
.setMaxOutstandingSplits(1_000)
.setMaxOutstandingSplits(3_000)
.setMaxOutstandingSplitsSize(DataSize.of(256, Unit.MEGABYTE))
.setMaxSplitIteratorThreads(1_000)
.setPerTransactionMetastoreCacheMaximumSize(1000)
Expand Down