diff --git a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java index 5191519df577..2d964eb0d00a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/TaskManagerConfig.java @@ -56,10 +56,10 @@ public class TaskManagerConfig private boolean shareIndexLoading; private int maxWorkerThreads = Runtime.getRuntime().availableProcessors() * 2; private Integer minDrivers; - private Integer initialSplitsPerNode; + private int initialSplitsPerNode = Runtime.getRuntime().availableProcessors() * 4; private int minDriversPerTask = 3; private int maxDriversPerTask = Integer.MAX_VALUE; - private Duration splitConcurrencyAdjustmentInterval = new Duration(100, TimeUnit.MILLISECONDS); + private Duration splitConcurrencyAdjustmentInterval = new Duration(1, TimeUnit.SECONDS); private DataSize sinkMaxBufferSize = DataSize.of(32, Unit.MEGABYTE); private DataSize sinkMaxBroadcastBufferSize = DataSize.of(200, Unit.MEGABYTE); @@ -294,9 +294,6 @@ public TaskManagerConfig setMaxWorkerThreads(int maxWorkerThreads) @Min(1) public int getInitialSplitsPerNode() { - if (initialSplitsPerNode == null) { - return maxWorkerThreads; - } return initialSplitsPerNode; } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java index 361021493f25..c534d09939ae 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java @@ -45,8 +45,8 @@ public enum SplitsBalancingPolicy private int minCandidates = 10; private boolean includeCoordinator = true; - private int maxSplitsPerNode = 100; - private int minPendingSplitsPerTask = 10; + private int maxSplitsPerNode = 256; + private int minPendingSplitsPerTask = 16; private int maxAdjustedPendingSplitsWeightPerTask = 2000; private NodeSchedulerPolicy nodeSchedulerPolicy = NodeSchedulerPolicy.UNIFORM; private boolean optimizedLocalScheduling = true; diff --git a/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java index 9f6c1d894e92..ffcb3abead06 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java @@ -35,8 +35,8 @@ public void testDefaults() assertRecordedDefaults(recordDefaults(NodeSchedulerConfig.class) .setNodeSchedulerPolicy(UNIFORM.name()) .setMinCandidates(10) - .setMaxSplitsPerNode(100) - .setMinPendingSplitsPerTask(10) + .setMaxSplitsPerNode(256) + .setMinPendingSplitsPerTask(16) .setMaxAdjustedPendingSplitsWeightPerTask(2000) .setMaxUnacknowledgedSplitsPerTask(2000) .setIncludeCoordinator(true) diff --git a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java index fc3d7cabee84..d48d2fde1082 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestTaskManagerConfig.java @@ -41,8 +41,8 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(TaskManagerConfig.class) .setThreadPerDriverSchedulerEnabled(false) - .setInitialSplitsPerNode(Runtime.getRuntime().availableProcessors() * 2) - .setSplitConcurrencyAdjustmentInterval(new Duration(100, TimeUnit.MILLISECONDS)) + .setInitialSplitsPerNode(Runtime.getRuntime().availableProcessors() * 4) + .setSplitConcurrencyAdjustmentInterval(new Duration(1, TimeUnit.SECONDS)) .setStatusRefreshMaxWait(new Duration(1, TimeUnit.SECONDS)) .setInfoUpdateInterval(new Duration(3, TimeUnit.SECONDS)) .setTaskTerminationTimeout(new Duration(1, TimeUnit.MINUTES)) @@ -87,7 +87,7 @@ public void testExplicitPropertyMappings() Map properties = ImmutableMap.builder() .put("experimental.thread-per-driver-scheduler-enabled", "true") .put("task.initial-splits-per-node", "1") - .put("task.split-concurrency-adjustment-interval", "1s") + .put("task.split-concurrency-adjustment-interval", "3s") .put("task.status-refresh-max-wait", "2s") .put("task.info-update-interval", "2s") .put("task.termination-timeout", "15s") @@ -127,7 +127,7 @@ public void testExplicitPropertyMappings() TaskManagerConfig expected = new TaskManagerConfig() .setThreadPerDriverSchedulerEnabled(true) .setInitialSplitsPerNode(1) - .setSplitConcurrencyAdjustmentInterval(new Duration(1, TimeUnit.SECONDS)) + .setSplitConcurrencyAdjustmentInterval(new Duration(3, TimeUnit.SECONDS)) .setStatusRefreshMaxWait(new Duration(2, TimeUnit.SECONDS)) .setInfoUpdateInterval(new Duration(2, TimeUnit.SECONDS)) .setTaskTerminationTimeout(new Duration(15, TimeUnit.SECONDS)) diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java index 7615c899da0e..64896d5d6401 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestSourcePartitionedScheduler.java @@ -507,9 +507,9 @@ public void testNewTaskScheduledWhenChildStageBufferIsUnderutilized() StageScheduler scheduler = newSourcePartitionedSchedulerAsStageScheduler( stage, TABLE_SCAN_NODE_ID, - new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createFixedSplitSource(500, TestingSplit::createRemoteSplit)), + new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createFixedSplitSource(4 * 300, TestingSplit::createRemoteSplit)), new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(session, Optional.of(TEST_CATALOG_HANDLE)), stage::getAllTasks), - 500, + 4 * 300, new DynamicFilterService(metadata, functionManager, typeOperators, new DynamicFilterConfig()), new TableExecuteContextManager(), () -> false); @@ -518,10 +518,10 @@ public void testNewTaskScheduledWhenChildStageBufferIsUnderutilized() ScheduleResult scheduleResult = scheduler.schedule(); assertEquals(scheduleResult.getBlockedReason().get(), SPLIT_QUEUES_FULL); assertEquals(scheduleResult.getNewTasks().size(), 3); - assertEquals(scheduleResult.getSplitsScheduled(), 300); + assertEquals(scheduleResult.getSplitsScheduled(), 3 * 256); for (RemoteTask remoteTask : scheduleResult.getNewTasks()) { PartitionedSplitsInfo splitsInfo = remoteTask.getPartitionedSplitsInfo(); - assertEquals(splitsInfo.getCount(), 100); + assertEquals(splitsInfo.getCount(), 256); } // new node added - the pending splits should go to it since the child tasks are not blocked @@ -529,7 +529,7 @@ public void testNewTaskScheduledWhenChildStageBufferIsUnderutilized() scheduleResult = scheduler.schedule(); assertEquals(scheduleResult.getNewTasks().size(), 1); assertEquals(scheduleResult.getBlockedReason().get(), SPLIT_QUEUES_FULL); // split queue is full but still the source task creation isn't blocked - assertEquals(scheduleResult.getSplitsScheduled(), 100); + assertEquals(scheduleResult.getSplitsScheduled(), 256); } @Test @@ -550,9 +550,9 @@ public void testNoNewTaskScheduledWhenChildStageBufferIsOverutilized() StageScheduler scheduler = newSourcePartitionedSchedulerAsStageScheduler( stage, TABLE_SCAN_NODE_ID, - new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createFixedSplitSource(400, TestingSplit::createRemoteSplit)), + new ConnectorAwareSplitSource(TEST_CATALOG_HANDLE, createFixedSplitSource(3 * 300, TestingSplit::createRemoteSplit)), new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(session, Optional.of(TEST_CATALOG_HANDLE)), stage::getAllTasks), - 400, + 3 * 300, new DynamicFilterService(metadata, functionManager, typeOperators, new DynamicFilterConfig()), new TableExecuteContextManager(), () -> true); @@ -561,10 +561,10 @@ public void testNoNewTaskScheduledWhenChildStageBufferIsOverutilized() ScheduleResult scheduleResult = scheduler.schedule(); assertEquals(scheduleResult.getBlockedReason().get(), SPLIT_QUEUES_FULL); assertEquals(scheduleResult.getNewTasks().size(), 3); - assertEquals(scheduleResult.getSplitsScheduled(), 300); + assertEquals(scheduleResult.getSplitsScheduled(), 768); for (RemoteTask remoteTask : scheduleResult.getNewTasks()) { PartitionedSplitsInfo splitsInfo = remoteTask.getPartitionedSplitsInfo(); - assertEquals(splitsInfo.getCount(), 100); + assertEquals(splitsInfo.getCount(), 256); } // new node added but 1 child's output buffer is overutilized - so lockdown the tasks diff --git a/docs/src/main/sphinx/admin/properties-node-scheduler.md b/docs/src/main/sphinx/admin/properties-node-scheduler.md index 8213a90f7a6a..85a87a09f868 100644 --- a/docs/src/main/sphinx/admin/properties-node-scheduler.md +++ b/docs/src/main/sphinx/admin/properties-node-scheduler.md @@ -16,7 +16,7 @@ managing, and monitoring query execution. ## `node-scheduler.max-splits-per-node` - **Type:** {ref}`prop-type-integer` -- **Default value:** `100` +- **Default value:** `256` The target value for the total number of splits that can be running for each worker node, assuming all splits have the standard split weight. @@ -40,7 +40,7 @@ not higher. ## `node-scheduler.min-pending-splits-per-task` - **Type:** {ref}`prop-type-integer` -- **Default value:** `10` +- **Default value:** `16` The minimum number of outstanding splits with the standard split weight guaranteed to be scheduled on a node (even when the node is already at the limit for total number of splits) for a single task given the task has remaining splits to process. diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java index 02dbd79c6753..0f54cc3af1ec 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/HiveConfig.java @@ -75,7 +75,7 @@ public class HiveConfig private DataSize maxSplitSize = DataSize.of(64, MEGABYTE); private int maxPartitionsPerScan = 1_000_000; private int maxPartitionsForEagerLoad = 100_000; - private int maxOutstandingSplits = 1_000; + private int maxOutstandingSplits = 3_000; private DataSize maxOutstandingSplitsSize = DataSize.of(256, MEGABYTE); private int maxSplitIteratorThreads = 1_000; private int minPartitionBatchSize = 10; diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java index 13541c238372..7abc47c55b61 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/TestHiveConfig.java @@ -43,7 +43,7 @@ public void testDefaults() .setMaxSplitSize(DataSize.of(64, Unit.MEGABYTE)) .setMaxPartitionsPerScan(1_000_000) .setMaxPartitionsForEagerLoad(100_000) - .setMaxOutstandingSplits(1_000) + .setMaxOutstandingSplits(3_000) .setMaxOutstandingSplitsSize(DataSize.of(256, Unit.MEGABYTE)) .setMaxSplitIteratorThreads(1_000) .setPerTransactionMetastoreCacheMaximumSize(1000)