Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties-session.rst
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,20 @@ Use this to optimize the ``map_filter()`` and ``map_subset()`` function.

It controls if subfields access is executed at the data source or not.

``schedule_splits_based_on_task_load``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* **Type:** ``boolean``
* **Default value:** ``false``

If true then splits are scheduled to the tasks based on task load, rather than on the node load.
This is particularly useful for the native worker as it runs splits for tasks differently than the java worker.
The corresponding configuration property is :ref:`admin/properties:\`\`node-scheduler.max-splits-per-task\`\``.

Set to ``true`` to use as shown in this example:

``SET SESSION schedule_splits_based_on_task_load=true;``


JDBC Properties
---------------

Expand Down
24 changes: 24 additions & 0 deletions presto-docs/src/main/sphinx/admin/properties.rst
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,30 @@ due to splits not being balanced across workers. Ideally, it should be set
such that there is always at least one split waiting to be processed, but
not higher.

``node-scheduler.max-splits-per-task``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``integer``
* **Default value:** ``10``

The target value for the number of splits that can be running for
each task, assuming all splits have the standard split weight.

Using a higher value is recommended if tasks parallelism is higher than 10.
Increasing this value may improve query latency by ensuring that the workers
have enough splits to keep them fully utilized.

When connectors do support weight based split scheduling, the number of splits
assigned will depend on the weight of the individual splits. If splits are
small, more of them are allowed to be assigned to each worker to compensate.

Setting this too high will waste memory and may result in lower performance
due to splits not being balanced across workers. Ideally, it should be set
such that there is always at least one split waiting to be processed, but
not higher.

The corresponding session property is :ref:`admin/properties-session:\`\`schedule_splits_based_on_task_load\`\``.

``node-scheduler.max-pending-splits-per-task``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ public final class SystemSessionProperties
public static final String AGGREGATION_IF_TO_FILTER_REWRITE_STRATEGY = "aggregation_if_to_filter_rewrite_strategy";
public static final String JOINS_NOT_NULL_INFERENCE_STRATEGY = "joins_not_null_inference_strategy";
public static final String RESOURCE_AWARE_SCHEDULING_STRATEGY = "resource_aware_scheduling_strategy";
public static final String SCHEDULE_SPLITS_BASED_ON_TASK_LOAD = "schedule_splits_based_on_task_load";
public static final String HEAP_DUMP_ON_EXCEEDED_MEMORY_LIMIT_ENABLED = "heap_dump_on_exceeded_memory_limit_enabled";
public static final String EXCEEDED_MEMORY_LIMIT_HEAP_DUMP_FILE_DIRECTORY = "exceeded_memory_limit_heap_dump_file_directory";
public static final String DISTRIBUTED_TRACING_MODE = "distributed_tracing_mode";
Expand Down Expand Up @@ -1422,6 +1423,11 @@ public SystemSessionProperties(
false,
value -> ResourceAwareSchedulingStrategy.valueOf(((String) value).toUpperCase()),
ResourceAwareSchedulingStrategy::name),
booleanProperty(
SCHEDULE_SPLITS_BASED_ON_TASK_LOAD,
"Schedule splits based on task load, rather than on the node load.",
nodeSchedulerConfig.isScheduleSplitsBasedOnTaskLoad(),
false),
stringProperty(
ANALYZER_TYPE,
"Analyzer type to use.",
Expand Down Expand Up @@ -2917,6 +2923,11 @@ public static ResourceAwareSchedulingStrategy getResourceAwareSchedulingStrategy
return session.getSystemProperty(RESOURCE_AWARE_SCHEDULING_STRATEGY, ResourceAwareSchedulingStrategy.class);
}

public static Boolean isScheduleSplitsBasedOnTaskLoad(Session session)
{
return session.getSystemProperty(SCHEDULE_SPLITS_BASED_ON_TASK_LOAD, Boolean.class);
}

public static String getAnalyzerType(Session session)
{
return session.getSystemProperty(ANALYZER_TYPE, String.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public long getQueuedSplitsWeightForStage(InternalNode node)
return stageInfo == null ? 0 : stageInfo.getQueuedSplitsWeight();
}

public long getAssignedSplitsWeightForStage(InternalNode node)
{
PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(node.getNodeIdentifier());
return stageInfo == null ? 0 : stageInfo.getAssignedSplitsWeight();
}

public int getUnacknowledgedSplitCountForStage(InternalNode node)
{
PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(node.getNodeIdentifier());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static com.facebook.airlift.concurrent.MoreFutures.whenAnyCompleteCancelOthers;
import static com.facebook.presto.SystemSessionProperties.getMaxUnacknowledgedSplitsPerTask;
import static com.facebook.presto.SystemSessionProperties.getResourceAwareSchedulingStrategy;
import static com.facebook.presto.SystemSessionProperties.isScheduleSplitsBasedOnTaskLoad;
import static com.facebook.presto.execution.scheduler.NodeSchedulerConfig.NetworkTopologyType;
import static com.facebook.presto.execution.scheduler.NodeSchedulerConfig.ResourceAwareSchedulingStrategy;
import static com.facebook.presto.execution.scheduler.NodeSchedulerConfig.ResourceAwareSchedulingStrategy.TTL;
Expand Down Expand Up @@ -91,6 +92,7 @@ public class NodeScheduler
private final int minCandidates;
private final boolean includeCoordinator;
private final long maxSplitsWeightPerNode;
private final long maxSplitsWeightPerTask;
private final long maxPendingSplitsWeightPerTask;
private final NodeTaskMap nodeTaskMap;
private final boolean useNetworkTopology;
Expand Down Expand Up @@ -146,6 +148,7 @@ public NodeScheduler(
int maxPendingSplitsPerTask = config.getMaxPendingSplitsPerTask();
checkArgument(maxSplitsPerNode >= maxPendingSplitsPerTask, "maxSplitsPerNode must be > maxPendingSplitsPerTask");
this.maxSplitsWeightPerNode = SplitWeight.rawValueForStandardSplitCount(maxSplitsPerNode);
this.maxSplitsWeightPerTask = SplitWeight.rawValueForStandardSplitCount(config.getMaxSplitsPerTask());
this.maxPendingSplitsWeightPerTask = SplitWeight.rawValueForStandardSplitCount(maxPendingSplitsPerTask);
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
this.useNetworkTopology = !config.getNetworkTopology().equals(NetworkTopologyType.LEGACY);
Expand Down Expand Up @@ -231,9 +234,11 @@ public NodeSelector createNodeSelector(Session session, ConnectorId connectorId,
nodeSelectionStats,
nodeTaskMap,
includeCoordinator,
isScheduleSplitsBasedOnTaskLoad(session),
nodeMap,
minCandidates,
maxSplitsWeightPerNode,
maxSplitsWeightPerTask,
maxPendingSplitsWeightPerTask,
maxUnacknowledgedSplitsPerTask,
maxTasksPerStage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public static class NetworkTopologyType
private int minCandidates = 10;
private boolean includeCoordinator = true;
private int maxSplitsPerNode = 100;
private int maxSplitsPerTask = 10;
private boolean scheduleSplitsBasedOnTaskLoad;
private int maxPendingSplitsPerTask = 10;
private int maxUnacknowledgedSplitsPerTask = 500;
private String networkTopology = NetworkTopologyType.LEGACY;
Expand Down Expand Up @@ -106,6 +108,33 @@ public NodeSchedulerConfig setMaxSplitsPerNode(int maxSplitsPerNode)
return this;
}

public int getMaxSplitsPerTask()
{
return maxSplitsPerTask;
}

@Config("node-scheduler.max-splits-per-task")
@ConfigDescription("The number of splits weighted at the standard split weight that are allowed to be scheduled for each task " +
"when scheduling splits based on the task load.")
public NodeSchedulerConfig setMaxSplitsPerTask(int maxSplitsPerTask)
{
this.maxSplitsPerTask = maxSplitsPerTask;
return this;
}

public boolean isScheduleSplitsBasedOnTaskLoad()
{
return scheduleSplitsBasedOnTaskLoad;
}

@Config("node-scheduler.schedule-splits-based-on-task-load")
@ConfigDescription("Schedule splits based on task load, rather than on the node load")
public NodeSchedulerConfig setScheduleSplitsBasedOnTaskLoad(boolean scheduleSplitsBasedOnTaskLoad)
Comment thread
spershin marked this conversation as resolved.
{
this.scheduleSplitsBasedOnTaskLoad = scheduleSplitsBasedOnTaskLoad;
return this;
}

@Min(1)
public int getMaxUnacknowledgedSplitsPerTask()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.airlift.log.Logger;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.TaskStatus;
import com.facebook.presto.execution.scheduler.BucketNodeMap;
import com.facebook.presto.execution.scheduler.InternalNodeInfo;
import com.facebook.presto.execution.scheduler.NodeAssignmentStats;
Expand All @@ -35,8 +36,10 @@
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
Expand Down Expand Up @@ -71,9 +74,11 @@ public class SimpleNodeSelector
private final NodeSelectionStats nodeSelectionStats;
private final NodeTaskMap nodeTaskMap;
private final boolean includeCoordinator;
private final boolean scheduleSplitsBasedOnTaskLoad;
private final AtomicReference<Supplier<NodeMap>> nodeMap;
private final int minCandidates;
private final long maxSplitsWeightPerNode;
private final long maxSplitsWeightPerTask;
private final long maxPendingSplitsWeightPerTask;
private final int maxUnacknowledgedSplitsPerTask;
private final int maxTasksPerStage;
Expand All @@ -84,9 +89,11 @@ public SimpleNodeSelector(
NodeSelectionStats nodeSelectionStats,
NodeTaskMap nodeTaskMap,
boolean includeCoordinator,
boolean scheduleSplitsBasedOnTaskLoad,
Supplier<NodeMap> nodeMap,
int minCandidates,
long maxSplitsWeightPerNode,
long maxSplitsWeightPerTask,
long maxPendingSplitsWeightPerTask,
int maxUnacknowledgedSplitsPerTask,
int maxTasksPerStage,
Expand All @@ -96,9 +103,11 @@ public SimpleNodeSelector(
this.nodeSelectionStats = requireNonNull(nodeSelectionStats, "nodeSelectionStats is null");
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
this.includeCoordinator = includeCoordinator;
this.scheduleSplitsBasedOnTaskLoad = scheduleSplitsBasedOnTaskLoad;
this.nodeMap = new AtomicReference<>(nodeMap);
this.minCandidates = minCandidates;
this.maxSplitsWeightPerNode = maxSplitsWeightPerNode;
this.maxSplitsWeightPerTask = maxSplitsWeightPerTask;
this.maxPendingSplitsWeightPerTask = maxPendingSplitsWeightPerTask;
this.maxUnacknowledgedSplitsPerTask = maxUnacknowledgedSplitsPerTask;
checkArgument(maxUnacknowledgedSplitsPerTask > 0, "maxUnacknowledgedSplitsPerTask must be > 0, found: %s", maxUnacknowledgedSplitsPerTask);
Expand Down Expand Up @@ -149,6 +158,11 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
Set<InternalNode> blockedExactNodes = new HashSet<>();
boolean splitWaitingForAnyNode = false;

Optional<ToLongFunction<InternalNode>> taskLoadSplitWeightProvider = Optional.empty();
if (this.scheduleSplitsBasedOnTaskLoad) {
taskLoadSplitWeightProvider = Optional.of(createTaskLoadSplitWeightProvider(existingTasks, assignmentStats));
}

NodeProvider nodeProvider = nodeMap.getNodeProvider(maxPreferredNodes);
OptionalInt preferredNodeCount = OptionalInt.empty();
for (Split split : splits) {
Expand Down Expand Up @@ -179,9 +193,16 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
}

SplitWeight splitWeight = split.getSplitWeight();
Optional<InternalNodeInfo> chosenNodeInfo = chooseLeastBusyNode(splitWeight, candidateNodes, assignmentStats::getTotalSplitsWeight, preferredNodeCount, maxSplitsWeightPerNode, assignmentStats);
if (!chosenNodeInfo.isPresent()) {
chosenNodeInfo = chooseLeastBusyNode(splitWeight, candidateNodes, assignmentStats::getQueuedSplitsWeightForStage, preferredNodeCount, maxPendingSplitsWeightPerTask, assignmentStats);
Optional<InternalNodeInfo> chosenNodeInfo = Optional.empty();

if (taskLoadSplitWeightProvider.isPresent()) {
chosenNodeInfo = chooseLeastBusyNode(splitWeight, candidateNodes, taskLoadSplitWeightProvider.get(), preferredNodeCount, maxSplitsWeightPerTask, assignmentStats);
}
else {
chosenNodeInfo = chooseLeastBusyNode(splitWeight, candidateNodes, assignmentStats::getTotalSplitsWeight, preferredNodeCount, maxSplitsWeightPerNode, assignmentStats);
if (!chosenNodeInfo.isPresent()) {
chosenNodeInfo = chooseLeastBusyNode(splitWeight, candidateNodes, assignmentStats::getQueuedSplitsWeightForStage, preferredNodeCount, maxPendingSplitsWeightPerTask, assignmentStats);
}
}

if (chosenNodeInfo.isPresent()) {
Expand Down Expand Up @@ -223,6 +244,28 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
return selectDistributionNodes(nodeMap.get().get(), nodeTaskMap, maxSplitsWeightPerNode, maxPendingSplitsWeightPerTask, maxUnacknowledgedSplitsPerTask, splits, existingTasks, bucketNodeMap, nodeSelectionStats);
}

private ToLongFunction<InternalNode> createTaskLoadSplitWeightProvider(List<RemoteTask> existingTasks, NodeAssignmentStats assignmentStats)
{
// Create a map from nodeId to RemoteTask for efficient lookup
Map<String, RemoteTask> tasksByNodeId = new HashMap<>();
for (RemoteTask task : existingTasks) {
tasksByNodeId.put(task.getNodeId(), task);
}

return node -> {
RemoteTask remoteTask = tasksByNodeId.get(node.getNodeIdentifier());
if (remoteTask == null) {
// No task for this node, return only the queued splits weight for the stage
return assignmentStats.getAssignedSplitsWeightForStage(node);
}

TaskStatus taskStatus = remoteTask.getTaskStatus();
return taskStatus.getQueuedPartitionedSplitsWeight() +
taskStatus.getRunningPartitionedSplitsWeight() +
assignmentStats.getAssignedSplitsWeightForStage(node);
};
}

protected Optional<InternalNodeInfo> chooseLeastBusyNode(SplitWeight splitWeight, List<InternalNode> candidateNodes, ToLongFunction<InternalNode> splitWeightProvider, OptionalInt preferredNodeCount, long maxSplitsWeight, NodeAssignmentStats assignmentStats)
{
long minWeight = Long.MAX_VALUE;
Expand Down
Loading
Loading