Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.execution.QueryManagerConfig;
import com.facebook.presto.execution.QueryManagerConfig.ExchangeMaterializationStrategy;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.execution.warnings.WarningCollectorConfig;
import com.facebook.presto.execution.warnings.WarningHandlingLevel;
import com.facebook.presto.memory.MemoryManagerConfig;
Expand Down Expand Up @@ -174,12 +175,13 @@ public final class SystemSessionProperties
public static final String SKIP_REDUNDANT_SORT = "skip_redundant_sort";
public static final String ALLOW_WINDOW_ORDER_BY_LITERALS = "allow_window_order_by_literals";
public static final String ENFORCE_FIXED_DISTRIBUTION_FOR_OUTPUT_OPERATOR = "enforce_fixed_distribution_for_output_operator";
public static final String MAX_UNACKNOWLEDGED_SPLITS_PER_TASK = "max_unacknowledged_splits_per_task";

private final List<PropertyMetadata<?>> sessionProperties;

public SystemSessionProperties()
{
this(new QueryManagerConfig(), new TaskManagerConfig(), new MemoryManagerConfig(), new FeaturesConfig(), new NodeMemoryConfig(), new WarningCollectorConfig());
this(new QueryManagerConfig(), new TaskManagerConfig(), new MemoryManagerConfig(), new FeaturesConfig(), new NodeMemoryConfig(), new WarningCollectorConfig(), new NodeSchedulerConfig());
}

@Inject
Expand All @@ -189,7 +191,8 @@ public SystemSessionProperties(
MemoryManagerConfig memoryManagerConfig,
FeaturesConfig featuresConfig,
NodeMemoryConfig nodeMemoryConfig,
WarningCollectorConfig warningCollectorConfig)
WarningCollectorConfig warningCollectorConfig,
NodeSchedulerConfig nodeSchedulerConfig)
{
sessionProperties = ImmutableList.of(
stringProperty(
Expand Down Expand Up @@ -908,7 +911,16 @@ public SystemSessionProperties(
ENFORCE_FIXED_DISTRIBUTION_FOR_OUTPUT_OPERATOR,
"Enforce fixed distribution for output operator",
featuresConfig.isEnforceFixedDistributionForOutputOperator(),
true));
true),
new PropertyMetadata<>(
MAX_UNACKNOWLEDGED_SPLITS_PER_TASK,
"Maximum number of leaf splits awaiting delivery to a given task",
INTEGER,
Integer.class,
nodeSchedulerConfig.getMaxUnacknowledgedSplitsPerTask(),
false,
value -> validateIntegerValue(value, MAX_UNACKNOWLEDGED_SPLITS_PER_TASK, 1, false),
object -> object));
}

public static boolean isSkipRedundantSort(Session session)
Expand Down Expand Up @@ -1539,4 +1551,9 @@ public static boolean isEnforceFixedDistributionForOutputOperator(Session sessio
{
return session.getSystemProperty(ENFORCE_FIXED_DISTRIBUTION_FOR_OUTPUT_OPERATOR, Boolean.class);
}

public static int getMaxUnacknowledgedSplitsPerTask(Session session)
{
return session.getSystemProperty(MAX_UNACKNOWLEDGED_SPLITS_PER_TASK, Integer.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,6 @@ public interface RemoteTask
int getPartitionedSplitCount();

int getQueuedPartitionedSplitCount();

int getUnacknowledgedPartitionedSplitCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,98 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public final class NodeAssignmentStats
{
private final NodeTaskMap nodeTaskMap;
private final Map<InternalNode, Integer> assignmentCount = new HashMap<>();
private final Map<InternalNode, Integer> splitCountByNode = new HashMap<>();
private final Map<String, Integer> queuedSplitCountByNode = new HashMap<>();
private final Map<InternalNode, Integer> nodeTotalSplitCount;
private final Map<String, PendingSplitInfo> stageQueuedSplitInfo;

public NodeAssignmentStats(NodeTaskMap nodeTaskMap, NodeMap nodeMap, List<RemoteTask> existingTasks)
{
this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null");
int nodeMapSize = requireNonNull(nodeMap, "nodeMap is null").getNodesByHostAndPort().size();
this.nodeTotalSplitCount = new HashMap<>(nodeMapSize);
this.stageQueuedSplitInfo = new HashMap<>(nodeMapSize);

// pre-populate the assignment counts with zeros. This makes getOrDefault() faster
for (InternalNode node : nodeMap.getNodesByHostAndPort().values()) {
assignmentCount.put(node, 0);
for (RemoteTask task : existingTasks) {
checkArgument(stageQueuedSplitInfo.put(task.getNodeId(), new PendingSplitInfo(task.getQueuedPartitionedSplitCount(), task.getUnacknowledgedPartitionedSplitCount())) == null, "A single stage may not have multiple tasks running on the same node");
}

for (RemoteTask task : existingTasks) {
checkArgument(queuedSplitCountByNode.put(task.getNodeId(), task.getQueuedPartitionedSplitCount()) == null, "A single stage may not have multiple tasks running on the same node");
// pre-populate the assignment counts with zeros
if (existingTasks.size() < nodeMapSize) {
Function<String, PendingSplitInfo> createEmptySplitInfo = (ignored) -> new PendingSplitInfo(0, 0);
for (InternalNode node : nodeMap.getNodesByHostAndPort().values()) {
stageQueuedSplitInfo.computeIfAbsent(node.getNodeIdentifier(), createEmptySplitInfo);
}
}
}

public int getTotalSplitCount(InternalNode node)
{
return assignmentCount.getOrDefault(node, 0) + splitCountByNode.computeIfAbsent(node, nodeTaskMap::getPartitionedSplitsOnNode);
int nodeTotalSplits = nodeTotalSplitCount.computeIfAbsent(node, nodeTaskMap::getPartitionedSplitsOnNode);
PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(node.getNodeIdentifier());
return nodeTotalSplits + (stageInfo == null ? 0 : stageInfo.getAssignedSplitCount());
}

public int getQueuedSplitCountForStage(InternalNode node)
{
return queuedSplitCountByNode.getOrDefault(node.getNodeIdentifier(), 0) + assignmentCount.getOrDefault(node, 0);
PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(node.getNodeIdentifier());
return stageInfo == null ? 0 : stageInfo.getQueuedSplitCount();
}

public int getUnacknowledgedSplitCountForStage(InternalNode node)
{
PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(node.getNodeIdentifier());
return stageInfo == null ? 0 : stageInfo.getUnacknowledgedSplitCount();
}

public void addAssignedSplit(InternalNode node)
{
assignmentCount.merge(node, 1, (x, y) -> x + y);
String nodeId = node.getNodeIdentifier();
// Avoids the extra per-invocation lambda allocation of computeIfAbsent since assigning a split to an existing task more common than creating a new task
PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(nodeId);
if (stageInfo == null) {
stageInfo = new PendingSplitInfo(0, 0);
stageQueuedSplitInfo.put(nodeId, stageInfo);
}
stageInfo.addAssignedSplit();
}

private static final class PendingSplitInfo
{
private final int queuedSplitCount;
private final int unacknowledgedSplitCount;
private int assignedSplits;

private PendingSplitInfo(int queuedSplitCount, int unacknowledgedSplitCount)
{
this.queuedSplitCount = queuedSplitCount;
this.unacknowledgedSplitCount = unacknowledgedSplitCount;
}

public int getAssignedSplitCount()
{
return assignedSplits;
}

public int getQueuedSplitCount()
{
return queuedSplitCount + assignedSplits;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a bit weird to have getQueuedSplitCount return queuedSplitCount + assignedSplits, but I don't have a better name in mind.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the structure here is a little quirky (but is fundamentally the same as before, just with 1 fewer HashMap). Any split assigned within the current batch is "effectively queued" even though it hasn't made its way to the task yet. I couldn't think of a name that worked any better at making that piece clear.

}

public int getUnacknowledgedSplitCount()
{
return unacknowledgedSplitCount + assignedSplits;
}

public void addAssignedSplit()
{
assignedSplits++;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.execution.scheduler;

import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.Session;
import com.facebook.presto.execution.NodeTaskMap;
import com.facebook.presto.execution.RemoteTask;
import com.facebook.presto.execution.scheduler.nodeSelection.NodeSelectionStats;
Expand Down Expand Up @@ -50,6 +51,7 @@
import java.util.Set;

import static com.facebook.airlift.concurrent.MoreFutures.whenAnyCompleteCancelOthers;
import static com.facebook.presto.SystemSessionProperties.getMaxUnacknowledgedSplitsPerTask;
import static com.facebook.presto.execution.scheduler.NodeSchedulerConfig.NetworkTopologyType;
import static com.facebook.presto.spi.NodeState.ACTIVE;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -132,18 +134,19 @@ public Map<String, CounterStat> getTopologicalSplitCounters()
return counters.build();
}

public NodeSelector createNodeSelector(ConnectorId connectorId)
public NodeSelector createNodeSelector(Session session, ConnectorId connectorId)
{
return createNodeSelector(connectorId, Integer.MAX_VALUE);
return createNodeSelector(session, connectorId, Integer.MAX_VALUE);
}

public NodeSelector createNodeSelector(ConnectorId connectorId, int maxTasksPerStage)
public NodeSelector createNodeSelector(Session session, ConnectorId connectorId, int maxTasksPerStage)
{
// this supplier is thread-safe. TODO: this logic should probably move to the scheduler since the choice of which node to run in should be
// done as close to when the the split is about to be scheduled
Supplier<NodeMap> nodeMap = nodeMapRefreshInterval.toMillis() > 0 ?
memoizeWithExpiration(createNodeMapSupplier(connectorId), nodeMapRefreshInterval.toMillis(), MILLISECONDS) : createNodeMapSupplier(connectorId);

int maxUnacknowledgedSplitsPerTask = getMaxUnacknowledgedSplitsPerTask(requireNonNull(session, "session is null"));
if (useNetworkTopology) {
return new TopologyAwareNodeSelector(
nodeManager,
Expand All @@ -154,12 +157,13 @@ public NodeSelector createNodeSelector(ConnectorId connectorId, int maxTasksPerS
minCandidates,
maxSplitsPerNode,
maxPendingSplitsPerTask,
maxUnacknowledgedSplitsPerTask,
topologicalSplitCounters,
networkLocationSegmentNames,
networkLocationCache);
}
else {
return new SimpleNodeSelector(nodeManager, nodeSelectionStats, nodeTaskMap, includeCoordinator, nodeMap, minCandidates, maxSplitsPerNode, maxPendingSplitsPerTask, maxTasksPerStage);
return new SimpleNodeSelector(nodeManager, nodeSelectionStats, nodeTaskMap, includeCoordinator, nodeMap, minCandidates, maxSplitsPerNode, maxPendingSplitsPerTask, maxUnacknowledgedSplitsPerTask, maxTasksPerStage);
}
}

Expand Down Expand Up @@ -291,6 +295,7 @@ public static SplitPlacementResult selectDistributionNodes(
NodeTaskMap nodeTaskMap,
int maxSplitsPerNode,
int maxPendingSplitsPerTask,
int maxUnacknowledgedSplitsPerTask,
Set<Split> splits,
List<RemoteTask> existingTasks,
BucketNodeMap bucketNodeMap,
Expand All @@ -306,8 +311,7 @@ public static SplitPlacementResult selectDistributionNodes(
boolean isCacheable = bucketNodeMap.isSplitCacheable(split);

// if node is full, don't schedule now, which will push back on the scheduling of splits
if (assignmentStats.getTotalSplitCount(node) < maxSplitsPerNode ||
assignmentStats.getQueuedSplitCountForStage(node) < maxPendingSplitsPerTask) {
if (!isDistributionNodeFull(assignmentStats, node, maxSplitsPerNode, maxPendingSplitsPerTask, maxUnacknowledgedSplitsPerTask)) {
if (isCacheable) {
split = new Split(split.getConnectorId(), split.getTransactionHandle(), split.getConnectorSplit(), split.getLifespan(), new SplitContext(true));
nodeSelectionStats.incrementBucketedPreferredNodeSelectedCount();
Expand All @@ -327,6 +331,12 @@ public static SplitPlacementResult selectDistributionNodes(
return new SplitPlacementResult(blocked, ImmutableMultimap.copyOf(assignments));
}

private static boolean isDistributionNodeFull(NodeAssignmentStats assignmentStats, InternalNode node, int maxSplitsPerNode, int maxPendingSplitsPerTask, int maxUnacknowledgedSplitsPerTask)
{
return assignmentStats.getUnacknowledgedSplitCountForStage(node) >= maxUnacknowledgedSplitsPerTask ||
(assignmentStats.getTotalSplitCount(node) >= maxSplitsPerNode && assignmentStats.getQueuedSplitCountForStage(node) >= maxPendingSplitsPerTask);
}

public static int calculateLowWatermark(int maxPendingSplitsPerTask)
{
return (int) Math.ceil(maxPendingSplitsPerTask / 2.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.execution.scheduler;

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import com.facebook.airlift.configuration.DefunctConfig;
import com.facebook.airlift.configuration.LegacyConfig;

Expand All @@ -34,6 +35,7 @@ public static class NetworkTopologyType
private boolean includeCoordinator = true;
private int maxSplitsPerNode = 100;
private int maxPendingSplitsPerTask = 10;
private int maxUnacknowledgedSplitsPerTask = 500;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, what is the rationale behind using 500 as the default value? 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In an experimental setup I was running with differently (very large) split queue depth configurations and small files (aka: cheap splits that are processed very quickly once delivered), 500 seemed to be about the point of diminishing returns in terms of setting this value higher.

In the original iteration, I had this default to Integer.MAX_VALUE since I don't aim to affect any of the default scheduling behavior with this change, but rather just want to add a safety net for deeper split queues blowing up task update sizes. Dain convinced me to pick a more "reasonable" value that was still above the ceiling of having any affect on the current default configs.

private String networkTopology = NetworkTopologyType.LEGACY;

@NotNull
Expand Down Expand Up @@ -98,4 +100,18 @@ public NodeSchedulerConfig setMaxSplitsPerNode(int maxSplitsPerNode)
this.maxSplitsPerNode = maxSplitsPerNode;
return this;
}

@Min(1)
public int getMaxUnacknowledgedSplitsPerTask()
{
return maxUnacknowledgedSplitsPerTask;
}

@Config("node-scheduler.max-unacknowledged-splits-per-task")
@ConfigDescription("Maximum number of leaf splits not yet delivered to a given task")
public NodeSchedulerConfig setMaxUnacknowledgedSplitsPerTask(int maxUnacknowledgedSplitsPerTask)
{
this.maxUnacknowledgedSplitsPerTask = maxUnacknowledgedSplitsPerTask;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ private StageScheduler createStageScheduler(
connectorId = null;
}

NodeSelector nodeSelector = nodeScheduler.createNodeSelector(connectorId, maxTasksPerStage);
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, connectorId, maxTasksPerStage);
SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeSelector, stageExecution::getAllTasks);

checkArgument(!plan.getFragment().getStageExecutionDescriptor().isStageGroupedExecution());
Expand All @@ -304,7 +304,7 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
stageExecution,
sourceTasksProvider,
writerTasksProvider,
nodeScheduler.createNodeSelector(null),
nodeScheduler.createNodeSelector(session, null),
scheduledExecutor,
getWriterMinSize(session),
isOptimizedScaleWriterProducerBuffer(session));
Expand Down Expand Up @@ -343,7 +343,7 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
.collect(toImmutableList());
}
else {
stageNodeList = new ArrayList<>(nodeScheduler.createNodeSelector(connectorId).selectRandomNodes(maxTasksPerStage));
stageNodeList = new ArrayList<>(nodeScheduler.createNodeSelector(session, connectorId).selectRandomNodes(maxTasksPerStage));
}
}
else {
Expand All @@ -368,7 +368,7 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
bucketNodeMap,
splitBatchSize,
getConcurrentLifespansPerNode(session),
nodeScheduler.createNodeSelector(connectorId),
nodeScheduler.createNodeSelector(session, connectorId),
connectorPartitionHandles);
if (plan.getFragment().getStageExecutionDescriptor().isRecoverableGroupedExecution()) {
stageExecution.registerStageTaskRecoveryCallback(taskId -> {
Expand Down
Loading