From a86be1a3cd2336f1884371535221dbb23713fa93 Mon Sep 17 00:00:00 2001 From: Sergey Pershin Date: Fri, 12 Sep 2025 16:05:49 -0700 Subject: [PATCH] Add ability to schedule splits based on Task load, not Node load. --- .../main/sphinx/admin/properties-session.rst | 14 +++ .../src/main/sphinx/admin/properties.rst | 24 ++++ .../presto/SystemSessionProperties.java | 11 ++ .../scheduler/NodeAssignmentStats.java | 6 + .../execution/scheduler/NodeScheduler.java | 5 + .../scheduler/NodeSchedulerConfig.java | 29 +++++ .../nodeSelection/SimpleNodeSelector.java | 49 +++++++- .../presto/execution/TestNodeScheduler.java | 117 +++++++++++++++++- .../execution/TestNodeSchedulerConfig.java | 6 + 9 files changed, 257 insertions(+), 4 deletions(-) diff --git a/presto-docs/src/main/sphinx/admin/properties-session.rst b/presto-docs/src/main/sphinx/admin/properties-session.rst index dedfd7d26f2de..d74c98dff59c3 100644 --- a/presto-docs/src/main/sphinx/admin/properties-session.rst +++ b/presto-docs/src/main/sphinx/admin/properties-session.rst @@ -434,6 +434,20 @@ Use this to optimize the ``map_filter()`` and ``map_subset()`` function. It controls if subfields access is executed at the data source or not. +``schedule_splits_based_on_task_load`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +* **Type:** ``boolean`` +* **Default value:** ``false`` + +If true then splits are scheduled to the tasks based on task load, rather than on the node load. +This is particularly useful for the native worker as it runs splits for tasks differently than the java worker. +The corresponding configuration property is :ref:`admin/properties:\`\`node-scheduler.max-splits-per-task\`\``. + +Set to ``true`` to use as shown in this example: + +``SET SESSION schedule_splits_based_on_task_load=true;`` + + JDBC Properties --------------- diff --git a/presto-docs/src/main/sphinx/admin/properties.rst b/presto-docs/src/main/sphinx/admin/properties.rst index deb34f706af65..7a0a8de99348f 100644 --- a/presto-docs/src/main/sphinx/admin/properties.rst +++ b/presto-docs/src/main/sphinx/admin/properties.rst @@ -725,6 +725,30 @@ due to splits not being balanced across workers. Ideally, it should be set such that there is always at least one split waiting to be processed, but not higher. +``node-scheduler.max-splits-per-task`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** ``integer`` +* **Default value:** ``10`` + +The target value for the number of splits that can be running for +each task, assuming all splits have the standard split weight. + +Using a higher value is recommended if tasks parallelism is higher than 10. +Increasing this value may improve query latency by ensuring that the workers +have enough splits to keep them fully utilized. + +When connectors do support weight based split scheduling, the number of splits +assigned will depend on the weight of the individual splits. If splits are +small, more of them are allowed to be assigned to each worker to compensate. + +Setting this too high will waste memory and may result in lower performance +due to splits not being balanced across workers. Ideally, it should be set +such that there is always at least one split waiting to be processed, but +not higher. + +The corresponding session property is :ref:`admin/properties-session:\`\`schedule_splits_based_on_task_load\`\``. + ``node-scheduler.max-pending-splits-per-task`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java index f335f1f718c57..d5a199b253664 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main-base/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -244,6 +244,7 @@ public final class SystemSessionProperties public static final String AGGREGATION_IF_TO_FILTER_REWRITE_STRATEGY = "aggregation_if_to_filter_rewrite_strategy"; public static final String JOINS_NOT_NULL_INFERENCE_STRATEGY = "joins_not_null_inference_strategy"; public static final String RESOURCE_AWARE_SCHEDULING_STRATEGY = "resource_aware_scheduling_strategy"; + public static final String SCHEDULE_SPLITS_BASED_ON_TASK_LOAD = "schedule_splits_based_on_task_load"; public static final String HEAP_DUMP_ON_EXCEEDED_MEMORY_LIMIT_ENABLED = "heap_dump_on_exceeded_memory_limit_enabled"; public static final String EXCEEDED_MEMORY_LIMIT_HEAP_DUMP_FILE_DIRECTORY = "exceeded_memory_limit_heap_dump_file_directory"; public static final String DISTRIBUTED_TRACING_MODE = "distributed_tracing_mode"; @@ -1422,6 +1423,11 @@ public SystemSessionProperties( false, value -> ResourceAwareSchedulingStrategy.valueOf(((String) value).toUpperCase()), ResourceAwareSchedulingStrategy::name), + booleanProperty( + SCHEDULE_SPLITS_BASED_ON_TASK_LOAD, + "Schedule splits based on task load, rather than on the node load.", + nodeSchedulerConfig.isScheduleSplitsBasedOnTaskLoad(), + false), stringProperty( ANALYZER_TYPE, "Analyzer type to use.", @@ -2917,6 +2923,11 @@ public static ResourceAwareSchedulingStrategy getResourceAwareSchedulingStrategy return session.getSystemProperty(RESOURCE_AWARE_SCHEDULING_STRATEGY, ResourceAwareSchedulingStrategy.class); } + public static Boolean isScheduleSplitsBasedOnTaskLoad(Session session) + { + return session.getSystemProperty(SCHEDULE_SPLITS_BASED_ON_TASK_LOAD, Boolean.class); + } + public static String getAnalyzerType(Session session) { return session.getSystemProperty(ANALYZER_TYPE, String.class); diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeAssignmentStats.java b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeAssignmentStats.java index 7b4c7707e1292..feab5116aa8fa 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeAssignmentStats.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeAssignmentStats.java @@ -70,6 +70,12 @@ public long getQueuedSplitsWeightForStage(InternalNode node) return stageInfo == null ? 0 : stageInfo.getQueuedSplitsWeight(); } + public long getAssignedSplitsWeightForStage(InternalNode node) + { + PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(node.getNodeIdentifier()); + return stageInfo == null ? 0 : stageInfo.getAssignedSplitsWeight(); + } + public int getUnacknowledgedSplitCountForStage(InternalNode node) { PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(node.getNodeIdentifier()); diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeScheduler.java b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeScheduler.java index 6b05db702bc49..1f3f45f269b47 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeScheduler.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeScheduler.java @@ -61,6 +61,7 @@ import static com.facebook.airlift.concurrent.MoreFutures.whenAnyCompleteCancelOthers; import static com.facebook.presto.SystemSessionProperties.getMaxUnacknowledgedSplitsPerTask; import static com.facebook.presto.SystemSessionProperties.getResourceAwareSchedulingStrategy; +import static com.facebook.presto.SystemSessionProperties.isScheduleSplitsBasedOnTaskLoad; import static com.facebook.presto.execution.scheduler.NodeSchedulerConfig.NetworkTopologyType; import static com.facebook.presto.execution.scheduler.NodeSchedulerConfig.ResourceAwareSchedulingStrategy; import static com.facebook.presto.execution.scheduler.NodeSchedulerConfig.ResourceAwareSchedulingStrategy.TTL; @@ -91,6 +92,7 @@ public class NodeScheduler private final int minCandidates; private final boolean includeCoordinator; private final long maxSplitsWeightPerNode; + private final long maxSplitsWeightPerTask; private final long maxPendingSplitsWeightPerTask; private final NodeTaskMap nodeTaskMap; private final boolean useNetworkTopology; @@ -146,6 +148,7 @@ public NodeScheduler( int maxPendingSplitsPerTask = config.getMaxPendingSplitsPerTask(); checkArgument(maxSplitsPerNode >= maxPendingSplitsPerTask, "maxSplitsPerNode must be > maxPendingSplitsPerTask"); this.maxSplitsWeightPerNode = SplitWeight.rawValueForStandardSplitCount(maxSplitsPerNode); + this.maxSplitsWeightPerTask = SplitWeight.rawValueForStandardSplitCount(config.getMaxSplitsPerTask()); this.maxPendingSplitsWeightPerTask = SplitWeight.rawValueForStandardSplitCount(maxPendingSplitsPerTask); this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null"); this.useNetworkTopology = !config.getNetworkTopology().equals(NetworkTopologyType.LEGACY); @@ -231,9 +234,11 @@ public NodeSelector createNodeSelector(Session session, ConnectorId connectorId, nodeSelectionStats, nodeTaskMap, includeCoordinator, + isScheduleSplitsBasedOnTaskLoad(session), nodeMap, minCandidates, maxSplitsWeightPerNode, + maxSplitsWeightPerTask, maxPendingSplitsWeightPerTask, maxUnacknowledgedSplitsPerTask, maxTasksPerStage, diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeSchedulerConfig.java b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeSchedulerConfig.java index eb3e20992edc6..34aa69d9be896 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeSchedulerConfig.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/NodeSchedulerConfig.java @@ -33,6 +33,8 @@ public static class NetworkTopologyType private int minCandidates = 10; private boolean includeCoordinator = true; private int maxSplitsPerNode = 100; + private int maxSplitsPerTask = 10; + private boolean scheduleSplitsBasedOnTaskLoad; private int maxPendingSplitsPerTask = 10; private int maxUnacknowledgedSplitsPerTask = 500; private String networkTopology = NetworkTopologyType.LEGACY; @@ -106,6 +108,33 @@ public NodeSchedulerConfig setMaxSplitsPerNode(int maxSplitsPerNode) return this; } + public int getMaxSplitsPerTask() + { + return maxSplitsPerTask; + } + + @Config("node-scheduler.max-splits-per-task") + @ConfigDescription("The number of splits weighted at the standard split weight that are allowed to be scheduled for each task " + + "when scheduling splits based on the task load.") + public NodeSchedulerConfig setMaxSplitsPerTask(int maxSplitsPerTask) + { + this.maxSplitsPerTask = maxSplitsPerTask; + return this; + } + + public boolean isScheduleSplitsBasedOnTaskLoad() + { + return scheduleSplitsBasedOnTaskLoad; + } + + @Config("node-scheduler.schedule-splits-based-on-task-load") + @ConfigDescription("Schedule splits based on task load, rather than on the node load") + public NodeSchedulerConfig setScheduleSplitsBasedOnTaskLoad(boolean scheduleSplitsBasedOnTaskLoad) + { + this.scheduleSplitsBasedOnTaskLoad = scheduleSplitsBasedOnTaskLoad; + return this; + } + @Min(1) public int getMaxUnacknowledgedSplitsPerTask() { diff --git a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java index a7205852ea4f5..3f9526b064d98 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java +++ b/presto-main-base/src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java @@ -16,6 +16,7 @@ import com.facebook.airlift.log.Logger; import com.facebook.presto.execution.NodeTaskMap; import com.facebook.presto.execution.RemoteTask; +import com.facebook.presto.execution.TaskStatus; import com.facebook.presto.execution.scheduler.BucketNodeMap; import com.facebook.presto.execution.scheduler.InternalNodeInfo; import com.facebook.presto.execution.scheduler.NodeAssignmentStats; @@ -35,8 +36,10 @@ import com.google.common.collect.Multimap; import com.google.common.util.concurrent.ListenableFuture; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; @@ -71,9 +74,11 @@ public class SimpleNodeSelector private final NodeSelectionStats nodeSelectionStats; private final NodeTaskMap nodeTaskMap; private final boolean includeCoordinator; + private final boolean scheduleSplitsBasedOnTaskLoad; private final AtomicReference> nodeMap; private final int minCandidates; private final long maxSplitsWeightPerNode; + private final long maxSplitsWeightPerTask; private final long maxPendingSplitsWeightPerTask; private final int maxUnacknowledgedSplitsPerTask; private final int maxTasksPerStage; @@ -84,9 +89,11 @@ public SimpleNodeSelector( NodeSelectionStats nodeSelectionStats, NodeTaskMap nodeTaskMap, boolean includeCoordinator, + boolean scheduleSplitsBasedOnTaskLoad, Supplier nodeMap, int minCandidates, long maxSplitsWeightPerNode, + long maxSplitsWeightPerTask, long maxPendingSplitsWeightPerTask, int maxUnacknowledgedSplitsPerTask, int maxTasksPerStage, @@ -96,9 +103,11 @@ public SimpleNodeSelector( this.nodeSelectionStats = requireNonNull(nodeSelectionStats, "nodeSelectionStats is null"); this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null"); this.includeCoordinator = includeCoordinator; + this.scheduleSplitsBasedOnTaskLoad = scheduleSplitsBasedOnTaskLoad; this.nodeMap = new AtomicReference<>(nodeMap); this.minCandidates = minCandidates; this.maxSplitsWeightPerNode = maxSplitsWeightPerNode; + this.maxSplitsWeightPerTask = maxSplitsWeightPerTask; this.maxPendingSplitsWeightPerTask = maxPendingSplitsWeightPerTask; this.maxUnacknowledgedSplitsPerTask = maxUnacknowledgedSplitsPerTask; checkArgument(maxUnacknowledgedSplitsPerTask > 0, "maxUnacknowledgedSplitsPerTask must be > 0, found: %s", maxUnacknowledgedSplitsPerTask); @@ -149,6 +158,11 @@ public SplitPlacementResult computeAssignments(Set splits, List blockedExactNodes = new HashSet<>(); boolean splitWaitingForAnyNode = false; + Optional> taskLoadSplitWeightProvider = Optional.empty(); + if (this.scheduleSplitsBasedOnTaskLoad) { + taskLoadSplitWeightProvider = Optional.of(createTaskLoadSplitWeightProvider(existingTasks, assignmentStats)); + } + NodeProvider nodeProvider = nodeMap.getNodeProvider(maxPreferredNodes); OptionalInt preferredNodeCount = OptionalInt.empty(); for (Split split : splits) { @@ -179,9 +193,16 @@ public SplitPlacementResult computeAssignments(Set splits, List chosenNodeInfo = chooseLeastBusyNode(splitWeight, candidateNodes, assignmentStats::getTotalSplitsWeight, preferredNodeCount, maxSplitsWeightPerNode, assignmentStats); - if (!chosenNodeInfo.isPresent()) { - chosenNodeInfo = chooseLeastBusyNode(splitWeight, candidateNodes, assignmentStats::getQueuedSplitsWeightForStage, preferredNodeCount, maxPendingSplitsWeightPerTask, assignmentStats); + Optional chosenNodeInfo = Optional.empty(); + + if (taskLoadSplitWeightProvider.isPresent()) { + chosenNodeInfo = chooseLeastBusyNode(splitWeight, candidateNodes, taskLoadSplitWeightProvider.get(), preferredNodeCount, maxSplitsWeightPerTask, assignmentStats); + } + else { + chosenNodeInfo = chooseLeastBusyNode(splitWeight, candidateNodes, assignmentStats::getTotalSplitsWeight, preferredNodeCount, maxSplitsWeightPerNode, assignmentStats); + if (!chosenNodeInfo.isPresent()) { + chosenNodeInfo = chooseLeastBusyNode(splitWeight, candidateNodes, assignmentStats::getQueuedSplitsWeightForStage, preferredNodeCount, maxPendingSplitsWeightPerTask, assignmentStats); + } } if (chosenNodeInfo.isPresent()) { @@ -223,6 +244,28 @@ public SplitPlacementResult computeAssignments(Set splits, List createTaskLoadSplitWeightProvider(List existingTasks, NodeAssignmentStats assignmentStats) + { + // Create a map from nodeId to RemoteTask for efficient lookup + Map tasksByNodeId = new HashMap<>(); + for (RemoteTask task : existingTasks) { + tasksByNodeId.put(task.getNodeId(), task); + } + + return node -> { + RemoteTask remoteTask = tasksByNodeId.get(node.getNodeIdentifier()); + if (remoteTask == null) { + // No task for this node, return only the queued splits weight for the stage + return assignmentStats.getAssignedSplitsWeightForStage(node); + } + + TaskStatus taskStatus = remoteTask.getTaskStatus(); + return taskStatus.getQueuedPartitionedSplitsWeight() + + taskStatus.getRunningPartitionedSplitsWeight() + + assignmentStats.getAssignedSplitsWeightForStage(node); + }; + } + protected Optional chooseLeastBusyNode(SplitWeight splitWeight, List candidateNodes, ToLongFunction splitWeightProvider, OptionalInt preferredNodeCount, long maxSplitsWeight, NodeAssignmentStats assignmentStats) { long minWeight = Long.MAX_VALUE; diff --git a/presto-main-base/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java b/presto-main-base/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java index b48d486009a40..0925f42733a77 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java +++ b/presto-main-base/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java @@ -54,6 +54,7 @@ import com.facebook.presto.ttl.nodettlfetchermanagers.ThrowingNodeTtlFetcherManager; import com.facebook.presto.util.FinalizerService; import com.google.common.base.Splitter; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; @@ -1136,7 +1137,7 @@ public void testMemoryUsage() } @Test - public void testMaxTasksPerStageWittLimit() + public void testMaxTasksPerStageWithLimit() { NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService); TestingTransactionHandle transactionHandle = TestingTransactionHandle.create(); @@ -1241,6 +1242,120 @@ private static Session sessionWithTtlAwareSchedulingStrategyAndEstimatedExecutio .build(); } + private static Session sessionWithScheduleSplitsBasedOnTaskLoad(boolean scheduleSplitsBasedOnTaskLoad) + { + return TestingSession.testSessionBuilder() + .setSystemProperty("schedule_splits_based_on_task_load", String.valueOf(scheduleSplitsBasedOnTaskLoad)) + .build(); + } + + @Test + public void testScheduleSplitsBasedOnTaskLoad() + { + List existingTasks = new ArrayList<>(); + try { + // Test with scheduleSplitsBasedOnTaskLoad enabled + Session taskLoadSession = sessionWithScheduleSplitsBasedOnTaskLoad(true); + NodeSelector taskLoadNodeSelector = nodeScheduler.createNodeSelector(taskLoadSession, CONNECTOR_ID); + + TestingTransactionHandle transactionHandle = TestingTransactionHandle.create(); + MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(remoteTaskExecutor, remoteTaskScheduledExecutor); + + // Create existing tasks with different split weights to test task load selection. + // We will have two queries 'test1' and 'test2'. + // 'test1' would have more load on node 1 and less on node 2 and 3. + // 'test2' would have more load on nodes 2 and 3 and very little on node 1. + // Thus, we will have more total load on nodes 2 and 3, but less for 'test1' on them. + Set nodes = nodeManager.getActiveConnectorNodes(CONNECTOR_ID); + Map nodeToTaskMap = new HashMap<>(); + for (InternalNode node : nodes) { + int nodeIndex = Integer.parseInt(node.getNodeIdentifier().substring("other".length())); + + // Create tasks for query 'test1' with different loads: task 1 (for node 1) has more load. + int initialSplitsCount = (nodeIndex == 1) ? 5 : (nodeIndex == 2) ? 3 : 2; // First task more loaded + List initialSplits = new ArrayList<>(); + for (int j = 0; j < initialSplitsCount; j++) { + initialSplits.add(new Split(CONNECTOR_ID, transactionHandle, new TestSplitRemote())); + } + + TaskId taskId = new TaskId("test1", 1, 0, nodeIndex, 0); + MockRemoteTaskFactory.MockRemoteTask remoteTask = remoteTaskFactory.createTableScanTask( + taskId, node, initialSplits, nodeTaskMap.createTaskStatsTracker(node, taskId)); + remoteTask.startSplits(1); + + nodeTaskMap.addTask(node, remoteTask); + nodeToTaskMap.put(node, remoteTask); + existingTasks.add(remoteTask); + + // Create tasks for query 'test2' with different loads: tasks 2 and 3 (for nodes 2 and 3) have more load. + initialSplitsCount = (nodeIndex == 1) ? 1 : 7; // First task less loaded + initialSplits = new ArrayList<>(); + for (int j = 0; j < initialSplitsCount; j++) { + initialSplits.add(new Split(CONNECTOR_ID, transactionHandle, new TestSplitRemote())); + } + + taskId = new TaskId("test2", 1, 0, nodeIndex, 0); + remoteTask = remoteTaskFactory.createTableScanTask( + taskId, node, initialSplits, nodeTaskMap.createTaskStatsTracker(node, taskId)); + remoteTask.startSplits(1); + + nodeTaskMap.addTask(node, remoteTask); + } + + // Split situation is now the following (initial + second query + assigned): + // other1: 5 + 1 = 6 + // other2: 3 + 7 = 10 + // other3: 2 + 7 = 9 + // The task-based assignment would pick nodes where the tasks of the 1st query have fewer splits, + // namely nodes 2 and 3, even though they have more splits in total. + + // Create new splits to assign + Set newSplits = new HashSet<>(); + int numNewSplits = 4; + for (int i = 0; i < numNewSplits; i++) { + newSplits.add(new Split(CONNECTOR_ID, transactionHandle, new TestSplitRemote())); + } + + // Verify that splits were assigned only to nodes 2 and 3 + SplitPlacementResult result = taskLoadNodeSelector.computeAssignments(newSplits, existingTasks); + Multimap assignments = result.getAssignments(); + assertEquals(assignments.size(), numNewSplits); + for (InternalNode node : assignments.keySet()) { + assertTrue(node.getNodeIdentifier().equals("other2") || node.getNodeIdentifier().equals("other3")); + } + + Map> splitsForTasks = new HashMap<>(); + PlanNodeId planNodeId = new PlanNodeId("sourceId"); + for (InternalNode node : assignments.keySet()) { + Multimap splits = ArrayListMultimap.create(); + for (Split split : assignments.get(node)) { + splits.put(planNodeId, split); + } + nodeToTaskMap.get(node).addSplits(splits); + } + // Split situation is now the following (initial + second query + assigned) = task/node: + // other1: 5 + 1 + 0 = 5/6 + // other2: 3 + 7 + 2 = 5/12 + // other3: 2 + 7 + 2 = 4/12 + // The task-based assignment would pick nodes where the tasks of the 1st query have fewer splits, + // this time all nodes would be included as the low loaded ones catch up with the high loaded. + + // Verify that splits were assigned to all nodes. + result = taskLoadNodeSelector.computeAssignments(newSplits, existingTasks); + assignments = result.getAssignments(); + assertEquals(assignments.size(), numNewSplits); + for (InternalNode node : assignments.keySet()) { + assertTrue(node.getNodeIdentifier().equals("other1") || node.getNodeIdentifier().equals("other2") || node.getNodeIdentifier().equals("other3")); + } + } + finally { + // Cleanup + for (RemoteTask task : existingTasks) { + task.abort(); + } + } + } + private static PartitionedSplitsInfo standardWeightSplitsInfo(int splitCount) { return PartitionedSplitsInfo.forSplitCountAndWeightSum(splitCount, SplitWeight.rawValueForStandardSplitCount(splitCount)); diff --git a/presto-main-base/src/test/java/com/facebook/presto/execution/TestNodeSchedulerConfig.java b/presto-main-base/src/test/java/com/facebook/presto/execution/TestNodeSchedulerConfig.java index dc41f7026bdc9..45e3b64b52e57 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/execution/TestNodeSchedulerConfig.java +++ b/presto-main-base/src/test/java/com/facebook/presto/execution/TestNodeSchedulerConfig.java @@ -35,6 +35,8 @@ public void testDefaults() .setNetworkTopology(LEGACY) .setMinCandidates(10) .setMaxSplitsPerNode(100) + .setMaxSplitsPerTask(10) + .setScheduleSplitsBasedOnTaskLoad(false) .setMaxPendingSplitsPerTask(10) .setMaxUnacknowledgedSplitsPerTask(500) .setIncludeCoordinator(true) @@ -54,6 +56,8 @@ public void testExplicitPropertyMappings() .put("node-scheduler.max-pending-splits-per-task", "11") .put("node-scheduler.max-unacknowledged-splits-per-task", "501") .put("node-scheduler.max-splits-per-node", "101") + .put("node-scheduler.max-splits-per-task", "17") + .put("node-scheduler.schedule-splits-based-on-task-load", "true") .put("node-scheduler.node-selection-hash-strategy", "CONSISTENT_HASHING") .put("node-scheduler.consistent-hashing-min-virtual-node-count", "2000") .put("experimental.resource-aware-scheduling-strategy", "TTL") @@ -64,6 +68,8 @@ public void testExplicitPropertyMappings() .setNetworkTopology("flat") .setIncludeCoordinator(false) .setMaxSplitsPerNode(101) + .setMaxSplitsPerTask(17) + .setScheduleSplitsBasedOnTaskLoad(true) .setMaxPendingSplitsPerTask(11) .setMaxUnacknowledgedSplitsPerTask(501) .setMinCandidates(11)