diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 0119f80a7f398..890c325b9c61f 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -16,6 +16,7 @@ import com.facebook.presto.execution.QueryManagerConfig; import com.facebook.presto.execution.QueryManagerConfig.ExchangeMaterializationStrategy; import com.facebook.presto.execution.TaskManagerConfig; +import com.facebook.presto.execution.scheduler.NodeSchedulerConfig; import com.facebook.presto.execution.warnings.WarningCollectorConfig; import com.facebook.presto.execution.warnings.WarningHandlingLevel; import com.facebook.presto.memory.MemoryManagerConfig; @@ -174,12 +175,13 @@ public final class SystemSessionProperties public static final String SKIP_REDUNDANT_SORT = "skip_redundant_sort"; public static final String ALLOW_WINDOW_ORDER_BY_LITERALS = "allow_window_order_by_literals"; public static final String ENFORCE_FIXED_DISTRIBUTION_FOR_OUTPUT_OPERATOR = "enforce_fixed_distribution_for_output_operator"; + public static final String MAX_UNACKNOWLEDGED_SPLITS_PER_TASK = "max_unacknowledged_splits_per_task"; private final List> sessionProperties; public SystemSessionProperties() { - this(new QueryManagerConfig(), new TaskManagerConfig(), new MemoryManagerConfig(), new FeaturesConfig(), new NodeMemoryConfig(), new WarningCollectorConfig()); + this(new QueryManagerConfig(), new TaskManagerConfig(), new MemoryManagerConfig(), new FeaturesConfig(), new NodeMemoryConfig(), new WarningCollectorConfig(), new NodeSchedulerConfig()); } @Inject @@ -189,7 +191,8 @@ public SystemSessionProperties( MemoryManagerConfig memoryManagerConfig, FeaturesConfig featuresConfig, NodeMemoryConfig nodeMemoryConfig, - WarningCollectorConfig warningCollectorConfig) + WarningCollectorConfig warningCollectorConfig, + NodeSchedulerConfig nodeSchedulerConfig) { sessionProperties = ImmutableList.of( stringProperty( @@ -908,7 +911,16 @@ public SystemSessionProperties( ENFORCE_FIXED_DISTRIBUTION_FOR_OUTPUT_OPERATOR, "Enforce fixed distribution for output operator", featuresConfig.isEnforceFixedDistributionForOutputOperator(), - true)); + true), + new PropertyMetadata<>( + MAX_UNACKNOWLEDGED_SPLITS_PER_TASK, + "Maximum number of leaf splits awaiting delivery to a given task", + INTEGER, + Integer.class, + nodeSchedulerConfig.getMaxUnacknowledgedSplitsPerTask(), + false, + value -> validateIntegerValue(value, MAX_UNACKNOWLEDGED_SPLITS_PER_TASK, 1, false), + object -> object)); } public static boolean isSkipRedundantSort(Session session) @@ -1539,4 +1551,9 @@ public static boolean isEnforceFixedDistributionForOutputOperator(Session sessio { return session.getSystemProperty(ENFORCE_FIXED_DISTRIBUTION_FOR_OUTPUT_OPERATOR, Boolean.class); } + + public static int getMaxUnacknowledgedSplitsPerTask(Session session) + { + return session.getSystemProperty(MAX_UNACKNOWLEDGED_SPLITS_PER_TASK, Integer.class); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/RemoteTask.java b/presto-main/src/main/java/com/facebook/presto/execution/RemoteTask.java index 73ea659501e29..8e3e205fff21f 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/RemoteTask.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/RemoteTask.java @@ -73,4 +73,6 @@ public interface RemoteTask int getPartitionedSplitCount(); int getQueuedPartitionedSplitCount(); + + int getUnacknowledgedPartitionedSplitCount(); } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeAssignmentStats.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeAssignmentStats.java index 023ea9771307a..5051ba8813024 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeAssignmentStats.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeAssignmentStats.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.Function; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -27,36 +28,90 @@ public final class NodeAssignmentStats { private final NodeTaskMap nodeTaskMap; - private final Map assignmentCount = new HashMap<>(); - private final Map splitCountByNode = new HashMap<>(); - private final Map queuedSplitCountByNode = new HashMap<>(); + private final Map nodeTotalSplitCount; + private final Map stageQueuedSplitInfo; public NodeAssignmentStats(NodeTaskMap nodeTaskMap, NodeMap nodeMap, List existingTasks) { this.nodeTaskMap = requireNonNull(nodeTaskMap, "nodeTaskMap is null"); + int nodeMapSize = requireNonNull(nodeMap, "nodeMap is null").getNodesByHostAndPort().size(); + this.nodeTotalSplitCount = new HashMap<>(nodeMapSize); + this.stageQueuedSplitInfo = new HashMap<>(nodeMapSize); - // pre-populate the assignment counts with zeros. This makes getOrDefault() faster - for (InternalNode node : nodeMap.getNodesByHostAndPort().values()) { - assignmentCount.put(node, 0); + for (RemoteTask task : existingTasks) { + checkArgument(stageQueuedSplitInfo.put(task.getNodeId(), new PendingSplitInfo(task.getQueuedPartitionedSplitCount(), task.getUnacknowledgedPartitionedSplitCount())) == null, "A single stage may not have multiple tasks running on the same node"); } - for (RemoteTask task : existingTasks) { - checkArgument(queuedSplitCountByNode.put(task.getNodeId(), task.getQueuedPartitionedSplitCount()) == null, "A single stage may not have multiple tasks running on the same node"); + // pre-populate the assignment counts with zeros + if (existingTasks.size() < nodeMapSize) { + Function createEmptySplitInfo = (ignored) -> new PendingSplitInfo(0, 0); + for (InternalNode node : nodeMap.getNodesByHostAndPort().values()) { + stageQueuedSplitInfo.computeIfAbsent(node.getNodeIdentifier(), createEmptySplitInfo); + } } } public int getTotalSplitCount(InternalNode node) { - return assignmentCount.getOrDefault(node, 0) + splitCountByNode.computeIfAbsent(node, nodeTaskMap::getPartitionedSplitsOnNode); + int nodeTotalSplits = nodeTotalSplitCount.computeIfAbsent(node, nodeTaskMap::getPartitionedSplitsOnNode); + PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(node.getNodeIdentifier()); + return nodeTotalSplits + (stageInfo == null ? 0 : stageInfo.getAssignedSplitCount()); } public int getQueuedSplitCountForStage(InternalNode node) { - return queuedSplitCountByNode.getOrDefault(node.getNodeIdentifier(), 0) + assignmentCount.getOrDefault(node, 0); + PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(node.getNodeIdentifier()); + return stageInfo == null ? 0 : stageInfo.getQueuedSplitCount(); + } + + public int getUnacknowledgedSplitCountForStage(InternalNode node) + { + PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(node.getNodeIdentifier()); + return stageInfo == null ? 0 : stageInfo.getUnacknowledgedSplitCount(); } public void addAssignedSplit(InternalNode node) { - assignmentCount.merge(node, 1, (x, y) -> x + y); + String nodeId = node.getNodeIdentifier(); + // Avoids the extra per-invocation lambda allocation of computeIfAbsent since assigning a split to an existing task more common than creating a new task + PendingSplitInfo stageInfo = stageQueuedSplitInfo.get(nodeId); + if (stageInfo == null) { + stageInfo = new PendingSplitInfo(0, 0); + stageQueuedSplitInfo.put(nodeId, stageInfo); + } + stageInfo.addAssignedSplit(); + } + + private static final class PendingSplitInfo + { + private final int queuedSplitCount; + private final int unacknowledgedSplitCount; + private int assignedSplits; + + private PendingSplitInfo(int queuedSplitCount, int unacknowledgedSplitCount) + { + this.queuedSplitCount = queuedSplitCount; + this.unacknowledgedSplitCount = unacknowledgedSplitCount; + } + + public int getAssignedSplitCount() + { + return assignedSplits; + } + + public int getQueuedSplitCount() + { + return queuedSplitCount + assignedSplits; + } + + public int getUnacknowledgedSplitCount() + { + return unacknowledgedSplitCount + assignedSplits; + } + + public void addAssignedSplit() + { + assignedSplits++; + } } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeScheduler.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeScheduler.java index ac1f0ef206a3a..323765b29725e 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeScheduler.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeScheduler.java @@ -14,6 +14,7 @@ package com.facebook.presto.execution.scheduler; import com.facebook.airlift.stats.CounterStat; +import com.facebook.presto.Session; import com.facebook.presto.execution.NodeTaskMap; import com.facebook.presto.execution.RemoteTask; import com.facebook.presto.execution.scheduler.nodeSelection.NodeSelectionStats; @@ -50,6 +51,7 @@ import java.util.Set; import static com.facebook.airlift.concurrent.MoreFutures.whenAnyCompleteCancelOthers; +import static com.facebook.presto.SystemSessionProperties.getMaxUnacknowledgedSplitsPerTask; import static com.facebook.presto.execution.scheduler.NodeSchedulerConfig.NetworkTopologyType; import static com.facebook.presto.spi.NodeState.ACTIVE; import static com.google.common.base.Preconditions.checkArgument; @@ -132,18 +134,19 @@ public Map getTopologicalSplitCounters() return counters.build(); } - public NodeSelector createNodeSelector(ConnectorId connectorId) + public NodeSelector createNodeSelector(Session session, ConnectorId connectorId) { - return createNodeSelector(connectorId, Integer.MAX_VALUE); + return createNodeSelector(session, connectorId, Integer.MAX_VALUE); } - public NodeSelector createNodeSelector(ConnectorId connectorId, int maxTasksPerStage) + public NodeSelector createNodeSelector(Session session, ConnectorId connectorId, int maxTasksPerStage) { // this supplier is thread-safe. TODO: this logic should probably move to the scheduler since the choice of which node to run in should be // done as close to when the the split is about to be scheduled Supplier nodeMap = nodeMapRefreshInterval.toMillis() > 0 ? memoizeWithExpiration(createNodeMapSupplier(connectorId), nodeMapRefreshInterval.toMillis(), MILLISECONDS) : createNodeMapSupplier(connectorId); + int maxUnacknowledgedSplitsPerTask = getMaxUnacknowledgedSplitsPerTask(requireNonNull(session, "session is null")); if (useNetworkTopology) { return new TopologyAwareNodeSelector( nodeManager, @@ -154,12 +157,13 @@ public NodeSelector createNodeSelector(ConnectorId connectorId, int maxTasksPerS minCandidates, maxSplitsPerNode, maxPendingSplitsPerTask, + maxUnacknowledgedSplitsPerTask, topologicalSplitCounters, networkLocationSegmentNames, networkLocationCache); } else { - return new SimpleNodeSelector(nodeManager, nodeSelectionStats, nodeTaskMap, includeCoordinator, nodeMap, minCandidates, maxSplitsPerNode, maxPendingSplitsPerTask, maxTasksPerStage); + return new SimpleNodeSelector(nodeManager, nodeSelectionStats, nodeTaskMap, includeCoordinator, nodeMap, minCandidates, maxSplitsPerNode, maxPendingSplitsPerTask, maxUnacknowledgedSplitsPerTask, maxTasksPerStage); } } @@ -291,6 +295,7 @@ public static SplitPlacementResult selectDistributionNodes( NodeTaskMap nodeTaskMap, int maxSplitsPerNode, int maxPendingSplitsPerTask, + int maxUnacknowledgedSplitsPerTask, Set splits, List existingTasks, BucketNodeMap bucketNodeMap, @@ -306,8 +311,7 @@ public static SplitPlacementResult selectDistributionNodes( boolean isCacheable = bucketNodeMap.isSplitCacheable(split); // if node is full, don't schedule now, which will push back on the scheduling of splits - if (assignmentStats.getTotalSplitCount(node) < maxSplitsPerNode || - assignmentStats.getQueuedSplitCountForStage(node) < maxPendingSplitsPerTask) { + if (!isDistributionNodeFull(assignmentStats, node, maxSplitsPerNode, maxPendingSplitsPerTask, maxUnacknowledgedSplitsPerTask)) { if (isCacheable) { split = new Split(split.getConnectorId(), split.getTransactionHandle(), split.getConnectorSplit(), split.getLifespan(), new SplitContext(true)); nodeSelectionStats.incrementBucketedPreferredNodeSelectedCount(); @@ -327,6 +331,12 @@ public static SplitPlacementResult selectDistributionNodes( return new SplitPlacementResult(blocked, ImmutableMultimap.copyOf(assignments)); } + private static boolean isDistributionNodeFull(NodeAssignmentStats assignmentStats, InternalNode node, int maxSplitsPerNode, int maxPendingSplitsPerTask, int maxUnacknowledgedSplitsPerTask) + { + return assignmentStats.getUnacknowledgedSplitCountForStage(node) >= maxUnacknowledgedSplitsPerTask || + (assignmentStats.getTotalSplitCount(node) >= maxSplitsPerNode && assignmentStats.getQueuedSplitCountForStage(node) >= maxPendingSplitsPerTask); + } + public static int calculateLowWatermark(int maxPendingSplitsPerTask) { return (int) Math.ceil(maxPendingSplitsPerTask / 2.0); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeSchedulerConfig.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeSchedulerConfig.java index 08044717a3ea0..b79db3961b573 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeSchedulerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeSchedulerConfig.java @@ -14,6 +14,7 @@ package com.facebook.presto.execution.scheduler; import com.facebook.airlift.configuration.Config; +import com.facebook.airlift.configuration.ConfigDescription; import com.facebook.airlift.configuration.DefunctConfig; import com.facebook.airlift.configuration.LegacyConfig; @@ -34,6 +35,7 @@ public static class NetworkTopologyType private boolean includeCoordinator = true; private int maxSplitsPerNode = 100; private int maxPendingSplitsPerTask = 10; + private int maxUnacknowledgedSplitsPerTask = 500; private String networkTopology = NetworkTopologyType.LEGACY; @NotNull @@ -98,4 +100,18 @@ public NodeSchedulerConfig setMaxSplitsPerNode(int maxSplitsPerNode) this.maxSplitsPerNode = maxSplitsPerNode; return this; } + + @Min(1) + public int getMaxUnacknowledgedSplitsPerTask() + { + return maxUnacknowledgedSplitsPerTask; + } + + @Config("node-scheduler.max-unacknowledged-splits-per-task") + @ConfigDescription("Maximum number of leaf splits not yet delivered to a given task") + public NodeSchedulerConfig setMaxUnacknowledgedSplitsPerTask(int maxUnacknowledgedSplitsPerTask) + { + this.maxUnacknowledgedSplitsPerTask = maxUnacknowledgedSplitsPerTask; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java index 05983333e608f..52e3d521cfe5d 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java @@ -283,7 +283,7 @@ private StageScheduler createStageScheduler( connectorId = null; } - NodeSelector nodeSelector = nodeScheduler.createNodeSelector(connectorId, maxTasksPerStage); + NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, connectorId, maxTasksPerStage); SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeSelector, stageExecution::getAllTasks); checkArgument(!plan.getFragment().getStageExecutionDescriptor().isStageGroupedExecution()); @@ -304,7 +304,7 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) { stageExecution, sourceTasksProvider, writerTasksProvider, - nodeScheduler.createNodeSelector(null), + nodeScheduler.createNodeSelector(session, null), scheduledExecutor, getWriterMinSize(session), isOptimizedScaleWriterProducerBuffer(session)); @@ -343,7 +343,7 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) { .collect(toImmutableList()); } else { - stageNodeList = new ArrayList<>(nodeScheduler.createNodeSelector(connectorId).selectRandomNodes(maxTasksPerStage)); + stageNodeList = new ArrayList<>(nodeScheduler.createNodeSelector(session, connectorId).selectRandomNodes(maxTasksPerStage)); } } else { @@ -368,7 +368,7 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) { bucketNodeMap, splitBatchSize, getConcurrentLifespansPerNode(session), - nodeScheduler.createNodeSelector(connectorId), + nodeScheduler.createNodeSelector(session, connectorId), connectorPartitionHandles); if (plan.getFragment().getStageExecutionDescriptor().isRecoverableGroupedExecution()) { stageExecution.registerStageTaskRecoveryCallback(taskId -> { diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java index 4c3311d03f800..a554860b47f3e 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/SimpleNodeSelector.java @@ -27,7 +27,6 @@ import com.facebook.presto.spi.HostAddress; import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SplitContext; -import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.HashMultimap; @@ -41,6 +40,7 @@ import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.ToIntFunction; import static com.facebook.presto.execution.scheduler.NodeScheduler.calculateLowWatermark; import static com.facebook.presto.execution.scheduler.NodeScheduler.randomizedNodes; @@ -52,6 +52,7 @@ import static com.facebook.presto.spi.StandardErrorCode.NODE_SELECTION_NOT_SUPPORTED; import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE; import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.HARD_AFFINITY; +import static com.google.common.base.Preconditions.checkArgument; import static java.lang.String.format; import static java.util.Objects.requireNonNull; @@ -68,6 +69,7 @@ public class SimpleNodeSelector private final int minCandidates; private final int maxSplitsPerNode; private final int maxPendingSplitsPerTask; + private final int maxUnacknowledgedSplitsPerTask; private final int maxTasksPerStage; public SimpleNodeSelector( @@ -79,6 +81,7 @@ public SimpleNodeSelector( int minCandidates, int maxSplitsPerNode, int maxPendingSplitsPerTask, + int maxUnacknowledgedSplitsPerTask, int maxTasksPerStage) { this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); @@ -89,6 +92,8 @@ public SimpleNodeSelector( this.minCandidates = minCandidates; this.maxSplitsPerNode = maxSplitsPerNode; this.maxPendingSplitsPerTask = maxPendingSplitsPerTask; + this.maxUnacknowledgedSplitsPerTask = maxUnacknowledgedSplitsPerTask; + checkArgument(maxUnacknowledgedSplitsPerTask > 0, "maxUnacknowledgedSplitsPerTask must be > 0, found: %s", maxUnacknowledgedSplitsPerTask); this.maxTasksPerStage = maxTasksPerStage; } @@ -128,17 +133,22 @@ public SplitPlacementResult computeAssignments(Set splits, List blockedExactNodes = new HashSet<>(); boolean splitWaitingForAnyNode = false; - // todo identify if sorting will cause bottleneck - List sortedCandidates = sortedNodes(nodeMap); + List sortedCandidates = null; OptionalInt preferredNodeCount = OptionalInt.empty(); for (Split split : splits) { List candidateNodes; switch (split.getNodeSelectionStrategy()) { case HARD_AFFINITY: + if (sortedCandidates == null) { + sortedCandidates = sortedNodes(nodeMap); + } candidateNodes = selectExactNodes(nodeMap, split.getPreferredNodes(sortedCandidates), includeCoordinator); preferredNodeCount = OptionalInt.of(candidateNodes.size()); break; case SOFT_AFFINITY: + if (sortedCandidates == null) { + sortedCandidates = sortedNodes(nodeMap); + } candidateNodes = selectExactNodes(nodeMap, split.getPreferredNodes(sortedCandidates), includeCoordinator); preferredNodeCount = OptionalInt.of(candidateNodes.size()); candidateNodes = ImmutableList.builder().addAll(candidateNodes).addAll(randomNodeSelection.pickNodes(split)).build(); @@ -155,9 +165,9 @@ public SplitPlacementResult computeAssignments(Set splits, List chosenNodeInfo = chooseLeastBusyNode(candidateNodes, assignmentStats::getTotalSplitCount, preferredNodeCount, maxSplitsPerNode); + Optional chosenNodeInfo = chooseLeastBusyNode(candidateNodes, assignmentStats::getTotalSplitCount, preferredNodeCount, maxSplitsPerNode, assignmentStats); if (!chosenNodeInfo.isPresent()) { - chosenNodeInfo = chooseLeastBusyNode(candidateNodes, assignmentStats::getQueuedSplitCountForStage, preferredNodeCount, maxPendingSplitsPerTask); + chosenNodeInfo = chooseLeastBusyNode(candidateNodes, assignmentStats::getQueuedSplitCountForStage, preferredNodeCount, maxPendingSplitsPerTask, assignmentStats); } if (chosenNodeInfo.isPresent()) { @@ -196,16 +206,19 @@ else if (!splitWaitingForAnyNode) { @Override public SplitPlacementResult computeAssignments(Set splits, List existingTasks, BucketNodeMap bucketNodeMap) { - return selectDistributionNodes(nodeMap.get().get(), nodeTaskMap, maxSplitsPerNode, maxPendingSplitsPerTask, splits, existingTasks, bucketNodeMap, nodeSelectionStats); + return selectDistributionNodes(nodeMap.get().get(), nodeTaskMap, maxSplitsPerNode, maxPendingSplitsPerTask, maxUnacknowledgedSplitsPerTask, splits, existingTasks, bucketNodeMap, nodeSelectionStats); } - private Optional chooseLeastBusyNode(List candidateNodes, Function splitCountProvider, OptionalInt preferredNodeCount, int maxSplitCount) + private Optional chooseLeastBusyNode(List candidateNodes, ToIntFunction splitCountProvider, OptionalInt preferredNodeCount, int maxSplitCount, NodeAssignmentStats assignmentStats) { int min = Integer.MAX_VALUE; InternalNode chosenNode = null; for (int i = 0; i < candidateNodes.size(); i++) { InternalNode node = candidateNodes.get(i); - int splitCount = splitCountProvider.apply(node); + if (assignmentStats.getUnacknowledgedSplitCountForStage(node) >= maxUnacknowledgedSplitsPerTask) { + continue; + } + int splitCount = splitCountProvider.applyAsInt(node); // choose the preferred node first as long as they're not busy if (preferredNodeCount.isPresent() && i < preferredNodeCount.getAsInt() && splitCount < maxSplitCount) { diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/TopologyAwareNodeSelector.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/TopologyAwareNodeSelector.java index f168f86b29b8e..f0dc73815870c 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/TopologyAwareNodeSelector.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/nodeSelection/TopologyAwareNodeSelector.java @@ -54,6 +54,7 @@ import static com.facebook.presto.execution.scheduler.nodeSelection.NodeSelectionUtils.sortedNodes; import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE; import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.HARD_AFFINITY; +import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; public class TopologyAwareNodeSelector @@ -69,6 +70,7 @@ public class TopologyAwareNodeSelector private final int minCandidates; private final int maxSplitsPerNode; private final int maxPendingSplitsPerTask; + private final int maxUnacknowledgedSplitsPerTask; private final List topologicalSplitCounters; private final List networkLocationSegmentNames; private final NetworkLocationCache networkLocationCache; @@ -82,6 +84,7 @@ public TopologyAwareNodeSelector( int minCandidates, int maxSplitsPerNode, int maxPendingSplitsPerTask, + int maxUnacknowledgedSplitsPerTask, List topologicalSplitCounters, List networkLocationSegmentNames, NetworkLocationCache networkLocationCache) @@ -94,6 +97,8 @@ public TopologyAwareNodeSelector( this.minCandidates = minCandidates; this.maxSplitsPerNode = maxSplitsPerNode; this.maxPendingSplitsPerTask = maxPendingSplitsPerTask; + this.maxUnacknowledgedSplitsPerTask = maxUnacknowledgedSplitsPerTask; + checkArgument(maxUnacknowledgedSplitsPerTask > 0, "maxUnacknowledgedSplitsPerTask must be > 0, found: %s", maxUnacknowledgedSplitsPerTask); this.topologicalSplitCounters = requireNonNull(topologicalSplitCounters, "topologicalSplitCounters is null"); this.networkLocationSegmentNames = requireNonNull(networkLocationSegmentNames, "networkLocationSegmentNames is null"); this.networkLocationCache = requireNonNull(networkLocationCache, "networkLocationCache is null"); @@ -235,7 +240,7 @@ private int calculateMaxPendingSplits(int splitAffinity, int totalDepth) @Override public SplitPlacementResult computeAssignments(Set splits, List existingTasks, BucketNodeMap bucketNodeMap) { - return selectDistributionNodes(nodeMap.get().get(), nodeTaskMap, maxSplitsPerNode, maxPendingSplitsPerTask, splits, existingTasks, bucketNodeMap, nodeSelectionStats); + return selectDistributionNodes(nodeMap.get().get(), nodeTaskMap, maxSplitsPerNode, maxPendingSplitsPerTask, maxUnacknowledgedSplitsPerTask, splits, existingTasks, bucketNodeMap, nodeSelectionStats); } @Nullable @@ -247,12 +252,13 @@ private InternalNode bestNodeSplitCount(Iterator candidates, int m while (candidates.hasNext() && (fullCandidatesConsidered < minCandidatesWhenFull || bestQueueNotFull == null)) { InternalNode node = candidates.next(); - if (assignmentStats.getTotalSplitCount(node) < maxSplitsPerNode) { + boolean hasUnacknowledgedSplitSpace = assignmentStats.getUnacknowledgedSplitCountForStage(node) < maxUnacknowledgedSplitsPerTask; + if (hasUnacknowledgedSplitSpace && assignmentStats.getTotalSplitCount(node) < maxSplitsPerNode) { return node; } fullCandidatesConsidered++; int totalSplitCount = assignmentStats.getQueuedSplitCountForStage(node); - if (totalSplitCount < min && totalSplitCount < maxPendingSplitsPerTask) { + if (hasUnacknowledgedSplitSpace && totalSplitCount < min && totalSplitCount < maxPendingSplitsPerTask) { min = totalSplitCount; bestQueueNotFull = node; } diff --git a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java index b0049a49d3b37..a4d37cc8be304 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java +++ b/presto-main/src/main/java/com/facebook/presto/server/remotetask/HttpRemoteTask.java @@ -98,6 +98,7 @@ import static com.facebook.airlift.http.client.Request.Builder.preparePost; import static com.facebook.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator; import static com.facebook.airlift.http.client.StatusResponseHandler.createStatusResponseHandler; +import static com.facebook.presto.SystemSessionProperties.getMaxUnacknowledgedSplitsPerTask; import static com.facebook.presto.execution.TaskInfo.createInitialTask; import static com.facebook.presto.execution.TaskState.ABORTED; import static com.facebook.presto.execution.TaskState.FAILED; @@ -196,6 +197,7 @@ public final class HttpRemoteTask private final boolean thriftTransportEnabled; private final Protocol thriftProtocol; private final int maxTaskUpdateSizeInBytes; + private final int maxUnacknowledgedSplits; private final TableWriteInfo tableWriteInfo; @@ -278,6 +280,8 @@ public HttpRemoteTask( this.thriftProtocol = thriftProtocol; this.tableWriteInfo = tableWriteInfo; this.maxTaskUpdateSizeInBytes = maxTaskUpdateSizeInBytes; + this.maxUnacknowledgedSplits = getMaxUnacknowledgedSplitsPerTask(session); + checkArgument(maxUnacknowledgedSplits > 0, "maxUnacknowledgedSplits must be > 0, found: %s", maxUnacknowledgedSplits); this.tableScanPlanNodeIds = ImmutableSet.copyOf(planFragment.getTableScanSchedulingOrder()); this.remoteSourcePlanNodeIds = planFragment.getRemoteSourceNodes().stream() @@ -555,6 +559,12 @@ public int getQueuedPartitionedSplitCount() return getPendingSourceSplitCount() + taskStatus.getQueuedPartitionedDrivers(); } + @Override + public int getUnacknowledgedPartitionedSplitCount() + { + return getPendingSourceSplitCount(); + } + @SuppressWarnings("FieldAccessNotGuarded") private int getPendingSourceSplitCount() { @@ -593,11 +603,11 @@ public synchronized ListenableFuture whenSplitQueueHasSpace(int threshold) private synchronized void updateSplitQueueSpace() { - if (!whenSplitQueueHasSpaceThreshold.isPresent()) { - return; - } - splitQueueHasSpace = getQueuedPartitionedSplitCount() < whenSplitQueueHasSpaceThreshold.getAsInt(); - if (splitQueueHasSpace) { + // Must check whether the unacknowledged split count threshold is reached even without listeners registered yet + splitQueueHasSpace = getUnacknowledgedPartitionedSplitCount() < maxUnacknowledgedSplits && + (!whenSplitQueueHasSpaceThreshold.isPresent() || getQueuedPartitionedSplitCount() < whenSplitQueueHasSpaceThreshold.getAsInt()); + // Only trigger notifications if a listener might be registered + if (splitQueueHasSpace && whenSplitQueueHasSpaceThreshold.isPresent()) { whenSplitQueueHasSpace.complete(null, executor); } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java index 1d93a12afb3a8..9089d295767e0 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java @@ -139,7 +139,7 @@ public NodePartitionMap getNodePartitioningMap(Session session, PartitioningHand break; case NO_PREFERENCE: bucketToNode = createArbitraryBucketToNode( - nodeScheduler.createNodeSelector(connectorId).selectRandomNodes(getMaxTasksPerStage(session)), + nodeScheduler.createNodeSelector(session, connectorId).selectRandomNodes(getMaxTasksPerStage(session)), connectorBucketNodeMap.getBucketCount()); cacheable = false; break; @@ -187,7 +187,7 @@ public BucketNodeMap getBucketNodeMap(Session session, PartitioningHandle partit return new FixedBucketNodeMap( getSplitToBucket(session, partitioningHandle), createArbitraryBucketToNode( - nodeScheduler.createNodeSelector(partitioningHandle.getConnectorId().get()).selectRandomNodes(getMaxTasksPerStage(session)), + nodeScheduler.createNodeSelector(session, partitioningHandle.getConnectorId().get()).selectRandomNodes(getMaxTasksPerStage(session)), connectorBucketNodeMap.getBucketCount()), false); default: @@ -207,7 +207,7 @@ private ConnectorBucketNodeMap getConnectorBucketNodeMap(Session session, Partit checkArgument(!(partitioningHandle.getConnectorHandle() instanceof SystemPartitioningHandle)); ConnectorId connectorId = partitioningHandle.getConnectorId() .orElseThrow(() -> new IllegalArgumentException("No connector ID for partitioning handle: " + partitioningHandle)); - List sortedNodes = sortedNodes(connectorId); + List sortedNodes = sortedNodes(session, connectorId); ConnectorNodePartitioningProvider partitioningProvider = partitioningProviderManager.getPartitioningProvider(partitioningHandle.getConnectorId().get()); @@ -258,9 +258,9 @@ private static List createArbitraryBucketToNode(List return distribution.build(); } - public List sortedNodes(ConnectorId connectorId) + public List sortedNodes(Session session, ConnectorId connectorId) { - List nodes = nodeScheduler.createNodeSelector(connectorId).allNodes(); + List nodes = nodeScheduler.createNodeSelector(session, connectorId).allNodes(); return nodes.stream().sorted(comparing(InternalNode::getNodeIdentifier)).collect(toImmutableList()); } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/SystemPartitioningHandle.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/SystemPartitioningHandle.java index e8eb5d3ff463d..28c35b338283a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/SystemPartitioningHandle.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/SystemPartitioningHandle.java @@ -138,7 +138,7 @@ public String toString() public NodePartitionMap getNodePartitionMap(Session session, NodeScheduler nodeScheduler) { - NodeSelector nodeSelector = nodeScheduler.createNodeSelector(null); + NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, null); List nodes; if (partitioning == SystemPartitioning.COORDINATOR_ONLY) { nodes = ImmutableList.of(nodeSelector.selectCurrentNode()); diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index 0543463352fb2..015b8a073f8c7 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -352,7 +352,8 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig, new MemoryManagerConfig(), featuresConfig, new NodeMemoryConfig(), - new WarningCollectorConfig())), + new WarningCollectorConfig(), + new NodeSchedulerConfig())), new SchemaPropertyManager(), new TablePropertyManager(), new ColumnPropertyManager(), diff --git a/presto-main/src/test/java/com/facebook/presto/execution/BenchmarkNodeScheduler.java b/presto-main/src/test/java/com/facebook/presto/execution/BenchmarkNodeScheduler.java index a6a9fdc8c3123..b0bf31b2ab356 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/BenchmarkNodeScheduler.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/BenchmarkNodeScheduler.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.execution; +import com.facebook.presto.Session; import com.facebook.presto.client.NodeVersion; import com.facebook.presto.execution.scheduler.FlatNetworkTopology; import com.facebook.presto.execution.scheduler.LegacyNetworkTopology; @@ -30,6 +31,7 @@ import com.facebook.presto.spi.HostAddress; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.schedule.NodeSelectionStrategy; +import com.facebook.presto.testing.TestingSession; import com.facebook.presto.testing.TestingTransactionHandle; import com.facebook.presto.util.FinalizerService; import com.google.common.base.Splitter; @@ -68,6 +70,7 @@ import java.util.concurrent.TimeUnit; import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; +import static com.facebook.presto.SystemSessionProperties.MAX_UNACKNOWLEDGED_SPLITS_PER_TASK; import static com.facebook.presto.execution.scheduler.NodeSchedulerConfig.NetworkTopologyType.BENCHMARK; import static com.facebook.presto.execution.scheduler.NodeSchedulerConfig.NetworkTopologyType.FLAT; import static com.facebook.presto.execution.scheduler.NodeSchedulerConfig.NetworkTopologyType.LEGACY; @@ -174,7 +177,10 @@ public void setup() InMemoryNodeManager nodeManager = new InMemoryNodeManager(); nodeManager.addNode(CONNECTOR_ID, nodes); NodeScheduler nodeScheduler = new NodeScheduler(getNetworkTopology(), nodeManager, new NodeSelectionStats(), getNodeSchedulerConfig(), nodeTaskMap); - nodeSelector = nodeScheduler.createNodeSelector(CONNECTOR_ID); + Session session = TestingSession.testSessionBuilder() + .setSystemProperty(MAX_UNACKNOWLEDGED_SPLITS_PER_TASK, Integer.toString(Integer.MAX_VALUE)) + .build(); + nodeSelector = nodeScheduler.createNodeSelector(session, CONNECTOR_ID); } @TearDown diff --git a/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java b/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java index b221200c4f444..a3002559f1238 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/MockRemoteTaskFactory.java @@ -85,6 +85,7 @@ import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; import static com.facebook.presto.util.Failures.toFailures; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; import static io.airlift.units.DataSize.Unit.BYTE; import static io.airlift.units.DataSize.Unit.GIGABYTE; @@ -180,6 +181,11 @@ public static final class MockRemoteTask @GuardedBy("this") private int runningDrivers; + @GuardedBy("this") + private int maxUnacknowledgedSplits = Integer.MAX_VALUE; + @GuardedBy("this") + private int unacknowledgedSplits; + @GuardedBy("this") private SettableFuture whenSplitQueueHasSpace = SettableFuture.create(); @@ -339,7 +345,7 @@ private void updateTaskStats() private synchronized void updateSplitQueueSpace() { - if (getQueuedPartitionedSplitCount() < 9) { + if (unacknowledgedSplits < maxUnacknowledgedSplits && getQueuedPartitionedSplitCount() < 9) { if (!whenSplitQueueHasSpace.isDone()) { whenSplitQueueHasSpace.set(null); } @@ -366,6 +372,7 @@ public synchronized void finishSplits(int splits) public synchronized void clearSplits() { + unacknowledgedSplits = 0; splits.clear(); updateTaskStats(); runningDrivers = 0; @@ -379,6 +386,20 @@ public synchronized void startSplits(int maxRunning) updateSplitQueueSpace(); } + public synchronized void setMaxUnacknowledgedSplits(int maxUnacknowledgedSplits) + { + checkArgument(maxUnacknowledgedSplits > 0); + this.maxUnacknowledgedSplits = maxUnacknowledgedSplits; + updateSplitQueueSpace(); + } + + public synchronized void setUnacknowledgedSplits(int unacknowledgedSplits) + { + checkArgument(unacknowledgedSplits >= 0); + this.unacknowledgedSplits = unacknowledgedSplits; + updateSplitQueueSpace(); + } + @Override public void start() { @@ -495,5 +516,11 @@ public synchronized int getQueuedPartitionedSplitCount() } return getPartitionedSplitCount() - runningDrivers; } + + @Override + public synchronized int getUnacknowledgedPartitionedSplitCount() + { + return unacknowledgedSplits; + } } } diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java b/presto-main/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java index 1385754d320c4..3050755ec0d1e 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestNodeScheduler.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.execution; +import com.facebook.presto.Session; import com.facebook.presto.client.NodeVersion; import com.facebook.presto.execution.scheduler.LegacyNetworkTopology; import com.facebook.presto.execution.scheduler.NetworkLocation; @@ -31,6 +32,7 @@ import com.facebook.presto.spi.HostAddress; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.schedule.NodeSelectionStrategy; +import com.facebook.presto.testing.TestingSession; import com.facebook.presto.testing.TestingTransactionHandle; import com.facebook.presto.util.FinalizerService; import com.google.common.base.Splitter; @@ -59,6 +61,7 @@ import java.util.concurrent.ThreadLocalRandom; import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; +import static com.facebook.presto.SystemSessionProperties.MAX_UNACKNOWLEDGED_SPLITS_PER_TASK; import static com.facebook.presto.execution.scheduler.NetworkLocation.ROOT_LOCATION; import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.HARD_AFFINITY; import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE; @@ -79,14 +82,18 @@ public class TestNodeScheduler private FinalizerService finalizerService; private NodeTaskMap nodeTaskMap; private InMemoryNodeManager nodeManager; + private NodeSchedulerConfig nodeSchedulerConfig; + private NodeScheduler nodeScheduler; private NodeSelector nodeSelector; private Map taskMap; private ExecutorService remoteTaskExecutor; private ScheduledExecutorService remoteTaskScheduledExecutor; + private Session session; @BeforeMethod public void setUp() { + session = TestingSession.testSessionBuilder().build(); finalizerService = new FinalizerService(); nodeTaskMap = new NodeTaskMap(finalizerService); nodeManager = new InMemoryNodeManager(); @@ -97,15 +104,15 @@ public void setUp() nodeBuilder.add(new InternalNode("other3", URI.create("http://127.0.0.1:13"), NodeVersion.UNKNOWN, false)); List nodes = nodeBuilder.build(); nodeManager.addNode(CONNECTOR_ID, nodes); - NodeSchedulerConfig nodeSchedulerConfig = new NodeSchedulerConfig() + nodeSchedulerConfig = new NodeSchedulerConfig() .setMaxSplitsPerNode(20) .setIncludeCoordinator(false) .setMaxPendingSplitsPerTask(10); - NodeScheduler nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, new NodeSelectionStats(), nodeSchedulerConfig, nodeTaskMap); + nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, new NodeSelectionStats(), nodeSchedulerConfig, nodeTaskMap); // contents of taskMap indicate the node-task map for the current stage taskMap = new HashMap<>(); - nodeSelector = nodeScheduler.createNodeSelector(CONNECTOR_ID); + nodeSelector = nodeScheduler.createNodeSelector(session, CONNECTOR_ID); remoteTaskExecutor = newCachedThreadPool(daemonThreadsNamed("remoteTaskExecutor-%s")); remoteTaskScheduledExecutor = newScheduledThreadPool(2, daemonThreadsNamed("remoteTaskScheduledExecutor-%s")); @@ -118,6 +125,9 @@ public void tearDown() remoteTaskExecutor.shutdown(); remoteTaskScheduledExecutor.shutdown(); finalizerService.destroy(); + nodeSchedulerConfig = null; + nodeScheduler = null; + nodeSelector = null; } @Test @@ -170,7 +180,7 @@ public NetworkLocation get(HostAddress host) } }; NodeScheduler nodeScheduler = new NodeScheduler(locationCache, topology, nodeManager, new NodeSelectionStats(), nodeSchedulerConfig, nodeTaskMap, new Duration(5, SECONDS)); - NodeSelector nodeSelector = nodeScheduler.createNodeSelector(CONNECTOR_ID); + NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, CONNECTOR_ID); // Fill up the nodes with non-local data ImmutableSet.Builder nonRackLocalBuilder = ImmutableSet.builder(); @@ -294,6 +304,25 @@ public void testBasicAssignment() } } + @Test + public void testBasicAssignmentMaxUnacknowledgedSplitsPerTask() + { + // Use non-default max unacknowledged splits per task + nodeSelector = nodeScheduler.createNodeSelector(sessionWithMaxUnacknowledgedSplitsPerTask(1), CONNECTOR_ID, Integer.MAX_VALUE); + // One split for each node, and one extra split that can't be placed + int nodeCount = nodeManager.getActiveConnectorNodes(CONNECTOR_ID).size(); + int splitCount = nodeCount + 1; + Set splits = new HashSet<>(); + for (int i = 0; i < splitCount; i++) { + splits.add(new Split(CONNECTOR_ID, TestingTransactionHandle.create(), new TestSplitRemote())); + } + Multimap assignments = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values())).getAssignments(); + assertEquals(assignments.entries().size(), nodeCount); + for (InternalNode node : nodeManager.getActiveConnectorNodes(CONNECTOR_ID)) { + assertTrue(assignments.keySet().contains(node)); + } + } + @Test public void testAffinityAssignmentNotSupported() { @@ -305,7 +334,7 @@ public void testAffinityAssignmentNotSupported() .setMaxPendingSplitsPerTask(10); NodeScheduler nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, new NodeSelectionStats(), nodeSchedulerConfig, nodeTaskMap); - NodeSelector nodeSelector = nodeScheduler.createNodeSelector(CONNECTOR_ID, 2); + NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, CONNECTOR_ID, 2); Set splits = new HashSet<>(); @@ -328,7 +357,7 @@ public void testAffinityAssignment() .setMaxPendingSplitsPerTask(10); NodeScheduler nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, new NodeSelectionStats(), nodeSchedulerConfig, nodeTaskMap); - NodeSelector nodeSelector = nodeScheduler.createNodeSelector(CONNECTOR_ID, 3); + NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, CONNECTOR_ID, 3); Set splits = new HashSet<>(); @@ -366,7 +395,7 @@ public void testHardAffinityAssignment() .setMaxPendingSplitsPerTask(10); NodeScheduler nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, new NodeSelectionStats(), nodeSchedulerConfig, nodeTaskMap); - NodeSelector nodeSelector = nodeScheduler.createNodeSelector(CONNECTOR_ID, 3); + NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, CONNECTOR_ID, 3); Set splits = new HashSet<>(); @@ -465,6 +494,66 @@ public void testMaxSplitsPerNodePerTask() assertEquals(nodeTaskMap.getPartitionedSplitsOnNode(newNode), 0); } + @Test + public void testMaxUnacknowledgedSplitsPerTask() + { + int maxUnacknowledgedSplitsPerTask = 5; + nodeSelector = nodeScheduler.createNodeSelector(sessionWithMaxUnacknowledgedSplitsPerTask(maxUnacknowledgedSplitsPerTask), CONNECTOR_ID, Integer.MAX_VALUE); + + TestingTransactionHandle transactionHandle = TestingTransactionHandle.create(); + ImmutableList.Builder initialSplits = ImmutableList.builder(); + for (int i = 0; i < maxUnacknowledgedSplitsPerTask; i++) { + initialSplits.add(new Split(CONNECTOR_ID, transactionHandle, new TestSplitRemote())); + } + + List nodes = new ArrayList<>(); + List tasks = new ArrayList<>(); + MockRemoteTaskFactory remoteTaskFactory = new MockRemoteTaskFactory(remoteTaskExecutor, remoteTaskScheduledExecutor); + int counter = 1; + for (InternalNode node : nodeManager.getActiveConnectorNodes(CONNECTOR_ID)) { + // Max out number of unacknowledged splits on each task + TaskId taskId = new TaskId("test", 1, 0, counter); + counter++; + MockRemoteTaskFactory.MockRemoteTask remoteTask = remoteTaskFactory.createTableScanTask(taskId, node, initialSplits.build(), nodeTaskMap.createTaskStatsTracker(node, taskId)); + nodeTaskMap.addTask(node, remoteTask); + remoteTask.setMaxUnacknowledgedSplits(maxUnacknowledgedSplitsPerTask); + remoteTask.setUnacknowledgedSplits(maxUnacknowledgedSplitsPerTask); + nodes.add(node); + tasks.add(remoteTask); + } + + // One split per node + Set splits = new HashSet<>(); + for (int i = 0; i < nodes.size(); i++) { + splits.add(new Split(CONNECTOR_ID, transactionHandle, new TestSplitRemote())); + } + SplitPlacementResult splitPlacements = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(tasks)); + // No splits should have been placed, max unacknowledged was already reached + assertEquals(splitPlacements.getAssignments().size(), 0); + assertFalse(splitPlacements.getBlocked().isDone()); + + // Unblock one task + MockRemoteTaskFactory.MockRemoteTask taskOne = tasks.get(0); + taskOne.finishSplits(1); + taskOne.setUnacknowledgedSplits(taskOne.getUnacknowledgedPartitionedSplitCount() - 1); + assertTrue(splitPlacements.getBlocked().isDone()); + + // Attempt to schedule again, only the node with the unblocked task should be chosen + splitPlacements = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(tasks)); + assertEquals(splitPlacements.getAssignments().size(), 1); + assertTrue(splitPlacements.getAssignments().keySet().contains(nodes.get(0))); + + // Make the first node appear to have no splits, unacknowledged splits alone should force the splits to be spread across nodes + taskOne.clearSplits(); + // Give all tasks with room for 1 unacknowledged split + tasks.forEach(task -> task.setUnacknowledgedSplits(maxUnacknowledgedSplitsPerTask - 1)); + + splitPlacements = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(tasks)); + // One split placed on each node + assertEquals(splitPlacements.getAssignments().size(), nodes.size()); + assertTrue(splitPlacements.getAssignments().keySet().containsAll(nodes)); + } + @Test public void testTaskCompletion() throws Exception @@ -609,7 +698,7 @@ public void testMaxTasksPerStageWittLimit() .setMaxPendingSplitsPerTask(10); NodeScheduler nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, new NodeSelectionStats(), nodeSchedulerConfig, nodeTaskMap); - NodeSelector nodeSelector = nodeScheduler.createNodeSelector(CONNECTOR_ID, 2); + NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, CONNECTOR_ID, 2); Set splits = new HashSet<>(); @@ -647,7 +736,7 @@ public void testMaxTasksPerStageAddingNewNodes() LegacyNetworkTopology networkTopology = new LegacyNetworkTopology(); // refresh interval is 1 nanosecond NodeScheduler nodeScheduler = new NodeScheduler(new NetworkLocationCache(networkTopology), networkTopology, nodeManager, new NodeSelectionStats(), nodeSchedulerConfig, nodeTaskMap, Duration.valueOf("0s")); - NodeSelector nodeSelector = nodeScheduler.createNodeSelector(CONNECTOR_ID, 2); + NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, CONNECTOR_ID, 2); Set splits = new HashSet<>(); splits.add(new Split(CONNECTOR_ID, transactionHandle, new TestSplitRemote())); @@ -689,6 +778,13 @@ private List getRemoteTableScanTask(SplitPlacementResult splitPlacem return ImmutableList.copyOf(taskMap.values()); } + private static Session sessionWithMaxUnacknowledgedSplitsPerTask(int maxUnacknowledgedSplitsPerTask) + { + return TestingSession.testSessionBuilder() + .setSystemProperty(MAX_UNACKNOWLEDGED_SPLITS_PER_TASK, Integer.toString(maxUnacknowledgedSplitsPerTask)) + .build(); + } + private static class TestSplitLocal implements ConnectorSplit { diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestNodeSchedulerConfig.java b/presto-main/src/test/java/com/facebook/presto/execution/TestNodeSchedulerConfig.java index d70aa77bef60d..18acce46ef9bf 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestNodeSchedulerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestNodeSchedulerConfig.java @@ -32,6 +32,7 @@ public void testDefaults() .setMinCandidates(10) .setMaxSplitsPerNode(100) .setMaxPendingSplitsPerTask(10) + .setMaxUnacknowledgedSplitsPerTask(500) .setIncludeCoordinator(true)); } @@ -43,6 +44,7 @@ public void testExplicitPropertyMappings() .put("node-scheduler.min-candidates", "11") .put("node-scheduler.include-coordinator", "false") .put("node-scheduler.max-pending-splits-per-task", "11") + .put("node-scheduler.max-unacknowledged-splits-per-task", "501") .put("node-scheduler.max-splits-per-node", "101") .build(); @@ -51,6 +53,7 @@ public void testExplicitPropertyMappings() .setIncludeCoordinator(false) .setMaxSplitsPerNode(101) .setMaxPendingSplitsPerTask(11) + .setMaxUnacknowledgedSplitsPerTask(501) .setMinCandidates(11); ConfigAssertions.assertFullMapping(properties, expected); diff --git a/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestSourcePartitionedScheduler.java b/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestSourcePartitionedScheduler.java index 1069ee6cea116..ad40c394125a2 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestSourcePartitionedScheduler.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/scheduler/TestSourcePartitionedScheduler.java @@ -54,6 +54,7 @@ import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.facebook.presto.testing.TestingMetadata.TestingColumnHandle; import com.facebook.presto.testing.TestingMetadata.TestingTableHandle; +import com.facebook.presto.testing.TestingSession; import com.facebook.presto.testing.TestingSplit; import com.facebook.presto.testing.TestingTransactionHandle; import com.facebook.presto.util.FinalizerService; @@ -326,7 +327,7 @@ public void testNoNodes() stage, TABLE_SCAN_NODE_ID, new ConnectorAwareSplitSource(CONNECTOR_ID, TestingTransactionHandle.create(), createFixedSplitSource(20, TestingSplit::createRemoteSplit)), - new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(CONNECTOR_ID), stage::getAllTasks), + new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(TestingSession.testSessionBuilder().build(), CONNECTOR_ID), stage::getAllTasks), 2); scheduler.schedule(); @@ -453,7 +454,7 @@ private static StageScheduler getSourcePartitionedScheduler( .setMaxPendingSplitsPerTask(0); NodeScheduler nodeScheduler = new NodeScheduler(new LegacyNetworkTopology(), nodeManager, new NodeSelectionStats(), nodeSchedulerConfig, nodeTaskMap); SplitSource splitSource = new ConnectorAwareSplitSource(CONNECTOR_ID, TestingTransactionHandle.create(), connectorSplitSource); - SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(splitSource.getConnectorId()), stage::getAllTasks); + SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeScheduler.createNodeSelector(TestingSession.testSessionBuilder().build(), splitSource.getConnectorId()), stage::getAllTasks); return newSourcePartitionedSchedulerAsStageScheduler(stage, TABLE_SCAN_NODE_ID, splitSource, placementPolicy, splitBatchSize); } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestAnalyzer.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestAnalyzer.java index d663648354c5e..0b1b498a39b2a 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestAnalyzer.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestAnalyzer.java @@ -17,6 +17,7 @@ import com.facebook.presto.SystemSessionProperties; import com.facebook.presto.execution.QueryManagerConfig; import com.facebook.presto.execution.TaskManagerConfig; +import com.facebook.presto.execution.scheduler.NodeSchedulerConfig; import com.facebook.presto.execution.warnings.WarningCollectorConfig; import com.facebook.presto.memory.MemoryManagerConfig; import com.facebook.presto.memory.NodeMemoryConfig; @@ -156,7 +157,8 @@ public void testWindowOrderByAnalysis() new MemoryManagerConfig(), new FeaturesConfig().setAllowWindowOrderByLiterals(false), new NodeMemoryConfig(), - new WarningCollectorConfig()))).build(); + new WarningCollectorConfig(), + new NodeSchedulerConfig()))).build(); assertFails(session, WINDOW_FUNCTION_ORDERBY_LITERAL, "SELECT SUM(x) OVER (PARTITION BY y ORDER BY 1) AS s\n" + "FROM (values (1,10), (2, 10)) AS T(x, y)"); @@ -540,7 +542,8 @@ public void testTooManyGroupingElements() new MemoryManagerConfig(), new FeaturesConfig().setMaxGroupingSets(2048), new NodeMemoryConfig(), - new WarningCollectorConfig()))).build(); + new WarningCollectorConfig(), + new NodeSchedulerConfig()))).build(); analyze(session, "SELECT a, b, c, d, e, f, g, h, i, j, k, SUM(l)" + "FROM (VALUES (1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12))\n" + "t (a, b, c, d, e, f, g, h, i, j, k, l)\n" + diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkNodeScheduler.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkNodeScheduler.java index 810e5bf108c93..7446fa769a85e 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkNodeScheduler.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/node/PrestoSparkNodeScheduler.java @@ -14,6 +14,7 @@ package com.facebook.presto.spark.node; import com.facebook.airlift.stats.CounterStat; +import com.facebook.presto.Session; import com.facebook.presto.execution.NodeTaskMap; import com.facebook.presto.execution.scheduler.LegacyNetworkTopology; import com.facebook.presto.execution.scheduler.NetworkLocationCache; @@ -49,13 +50,13 @@ public PrestoSparkNodeScheduler() } @Override - public NodeSelector createNodeSelector(ConnectorId connectorId) + public NodeSelector createNodeSelector(Session session, ConnectorId connectorId) { throw new UnsupportedOperationException(); } @Override - public NodeSelector createNodeSelector(ConnectorId connectorId, int maxTasksPerStage) + public NodeSelector createNodeSelector(Session session, ConnectorId connectorId, int maxTasksPerStage) { throw new UnsupportedOperationException(); }