Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,19 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.log.Logger;
import io.trino.execution.NodeTaskMap;
import io.trino.execution.RemoteTask;
import io.trino.execution.resourcegroups.IndexedPriorityQueue;
import io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.metadata.Split;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
import io.trino.spi.TrinoException;
import jakarta.annotation.Nullable;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -49,7 +41,6 @@
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.execution.scheduler.NodeScheduler.calculateLowWatermark;
import static io.trino.execution.scheduler.NodeScheduler.filterNodes;
import static io.trino.execution.scheduler.NodeScheduler.getAllNodes;
Expand All @@ -59,7 +50,6 @@
import static io.trino.execution.scheduler.NodeScheduler.selectNodes;
import static io.trino.execution.scheduler.NodeScheduler.toWhenHasSplitQueueSpaceFuture;
import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static java.util.Comparator.comparingLong;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -168,47 +158,37 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
queueSizeAdjuster.update(existingTasks, assignmentStats);
Set<InternalNode> blockedExactNodes = new HashSet<>();
boolean splitWaitingForAnyNode = false;
// splitsToBeRedistributed becomes true only when splits go through locality-based assignment
boolean splitsToBeRedistributed = false;
Set<Split> remainingSplits = new HashSet<>(splits.size());

List<InternalNode> filteredNodes = filterNodes(nodeMap, includeCoordinator, ImmutableSet.of());
ResettableRandomizedIterator<InternalNode> randomCandidates = new ResettableRandomizedIterator<>(filteredNodes);
Set<InternalNode> schedulableNodes = new HashSet<>(filteredNodes);

// optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain locality information
if (optimizedLocalScheduling) {
for (Split split : splits) {
if (split.isRemotelyAccessible() && !split.getAddresses().isEmpty()) {
List<InternalNode> candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);

Optional<InternalNode> chosenNode = candidateNodes.stream()
.filter(ownerNode -> assignmentStats.getTotalSplitsWeight(ownerNode) < maxSplitsWeightPerNode && assignmentStats.getUnacknowledgedSplitCountForStage(ownerNode) < maxUnacknowledgedSplitsPerTask)
.min(comparingLong(assignmentStats::getTotalSplitsWeight));

if (chosenNode.isPresent()) {
assignment.put(chosenNode.get(), split);
assignmentStats.addAssignedSplit(chosenNode.get(), split.getSplitWeight());
splitsToBeRedistributed = true;
continue;
}
}
remainingSplits.add(split);
}
}
else {
remainingSplits = splits;
}

for (Split split : remainingSplits) {
for (Split split : splits) {
randomCandidates.reset();

List<InternalNode> candidateNodes;
boolean exactNodes;
if (!split.isRemotelyAccessible()) {
candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);
exactNodes = true;
}
else {
candidateNodes = selectNodes(minCandidates, randomCandidates);
// optimizedLocalScheduling enables prioritized assignment of splits to local nodes when splits contain locality information
if (optimizedLocalScheduling && !split.getAddresses().isEmpty()) {
candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);
if (candidateNodes.isEmpty()) {
// choose any other node if preferred node is not available
candidateNodes = selectNodes(minCandidates, randomCandidates);
exactNodes = false;
}
else {
exactNodes = true;
}
}
else {
candidateNodes = selectNodes(minCandidates, randomCandidates);
exactNodes = false;
}
}
if (candidateNodes.isEmpty()) {
log.debug("No nodes available to schedule %s. Available nodes %s", split, nodeMap.getNodesByHost().keys());
Expand Down Expand Up @@ -238,7 +218,7 @@ public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTas
}
else {
candidateNodes.forEach(schedulableNodes::remove);
if (split.isRemotelyAccessible()) {
if (!exactNodes) {
splitWaitingForAnyNode = true;
}
// Exact node set won't matter, if a split is waiting for any node
Expand All @@ -261,9 +241,6 @@ else if (!splitWaitingForAnyNode) {
blocked = toWhenHasSplitQueueSpaceFuture(blockedExactNodes, existingTasks, calculateLowWatermark(minPendingSplitsWeightPerTask));
}

if (splitsToBeRedistributed) {
equateDistribution(assignment, assignmentStats, nodeMap, includeCoordinator);
}
return new SplitPlacementResult(blocked, assignment);
}

Expand Down Expand Up @@ -318,129 +295,6 @@ private List<InternalNode> getFreeNodesForStage(NodeAssignmentStats assignmentSt
return freeNodes.build();
}

/**
* The method tries to make the distribution of splits more uniform. All nodes are arranged into a maxHeap and a minHeap
* based on the number of splits that are assigned to them. Splits are redistributed, one at a time, from a maxNode to a
* minNode until we have as uniform a distribution as possible.
*
* @param assignment the node-splits multimap after the first and the second stage
* @param assignmentStats required to obtain info regarding splits assigned to a node outside the current batch of assignment
* @param nodeMap to get a list of all nodes to which splits can be assigned
*/
private void equateDistribution(Multimap<InternalNode, Split> assignment, NodeAssignmentStats assignmentStats, NodeMap nodeMap, boolean includeCoordinator)
{
if (assignment.isEmpty()) {
return;
}

Collection<InternalNode> allNodes = nodeMap.getNodesByHostAndPort().values().stream()
.filter(node -> includeCoordinator || !nodeMap.getCoordinatorNodeIds().contains(node.getNodeIdentifier()))
.collect(toImmutableList());

if (allNodes.size() < 2) {
return;
}

IndexedPriorityQueue<InternalNode> maxNodes = new IndexedPriorityQueue<>();
for (InternalNode node : assignment.keySet()) {
maxNodes.addOrUpdate(node, assignmentStats.getTotalSplitsWeight(node));
}

IndexedPriorityQueue<InternalNode> minNodes = new IndexedPriorityQueue<>();
for (InternalNode node : allNodes) {
minNodes.addOrUpdate(node, Long.MAX_VALUE - assignmentStats.getTotalSplitsWeight(node));
}

while (true) {
if (maxNodes.isEmpty()) {
return;
}

// fetch min and max node
InternalNode maxNode = maxNodes.poll();
InternalNode minNode = minNodes.poll();

// Allow some degree of non uniformity when assigning splits to nodes. Usually data distribution
// among nodes in a cluster won't be fully uniform (e.g. because hash function with non-uniform
// distribution is used like consistent hashing). In such case it makes sense to assign splits to nodes
// with data because of potential savings in network throughput and CPU time.
// The difference of 5 between node with maximum and minimum splits is a tradeoff between ratio of
// misassigned splits and assignment uniformity. Using larger numbers doesn't reduce the number of
// misassigned splits greatly (in absolute values).
if (assignmentStats.getTotalSplitsWeight(maxNode) - assignmentStats.getTotalSplitsWeight(minNode) <= SplitWeight.rawValueForStandardSplitCount(5)) {
return;
}

// move split from max to min
Split redistributed = redistributeSplit(assignment, maxNode, minNode, nodeMap.getNodesByHost());
assignmentStats.removeAssignedSplit(maxNode, redistributed.getSplitWeight());
assignmentStats.addAssignedSplit(minNode, redistributed.getSplitWeight());

// add max back into maxNodes only if it still has assignments
if (assignment.containsKey(maxNode)) {
maxNodes.addOrUpdate(maxNode, assignmentStats.getTotalSplitsWeight(maxNode));
}

// Add or update both the Priority Queues with the updated node priorities
maxNodes.addOrUpdate(minNode, assignmentStats.getTotalSplitsWeight(minNode));
minNodes.addOrUpdate(minNode, Long.MAX_VALUE - assignmentStats.getTotalSplitsWeight(minNode));
minNodes.addOrUpdate(maxNode, Long.MAX_VALUE - assignmentStats.getTotalSplitsWeight(maxNode));
}
}

/**
* The method selects and removes a split from the fromNode and assigns it to the toNode. There is an attempt to
* redistribute a Non-local split if possible. This case is possible when there are multiple queries running
* simultaneously. If a Non-local split cannot be found in the maxNode, any split is selected randomly and reassigned.
*/
@VisibleForTesting
public static Split redistributeSplit(Multimap<InternalNode, Split> assignment, InternalNode fromNode, InternalNode toNode, SetMultimap<InetAddress, InternalNode> nodesByHost)
{
Iterator<Split> splitIterator = assignment.get(fromNode).iterator();
Split splitToBeRedistributed = null;
while (splitIterator.hasNext()) {
Split split = splitIterator.next();
// Try to select non-local split for redistribution
if (!split.getAddresses().isEmpty() && !isSplitLocal(split.getAddresses(), fromNode.getHostAndPort(), nodesByHost)) {
splitToBeRedistributed = split;
break;
}
}
// Select any split if maxNode has no non-local splits in the current batch of assignment
if (splitToBeRedistributed == null) {
splitIterator = assignment.get(fromNode).iterator();
splitToBeRedistributed = splitIterator.next();
}
splitIterator.remove();
assignment.put(toNode, splitToBeRedistributed);
return splitToBeRedistributed;
}

/**
* Helper method to determine if a split is local to a node irrespective of whether splitAddresses contain port information or not
*/
private static boolean isSplitLocal(List<HostAddress> splitAddresses, HostAddress nodeAddress, SetMultimap<InetAddress, InternalNode> nodesByHost)
{
for (HostAddress address : splitAddresses) {
if (nodeAddress.equals(address)) {
return true;
}
InetAddress inetAddress;
try {
inetAddress = address.toInetAddress();
}
catch (UnknownHostException e) {
continue;
}
if (!address.hasPort()) {
Set<InternalNode> localNodes = nodesByHost.get(inetAddress);
return localNodes.stream()
.anyMatch(node -> node.getHostAndPort().equals(nodeAddress));
}
}
return false;
}

static class QueueSizeAdjuster
{
private static final long SCALE_DOWN_INTERVAL = SECONDS.toNanos(1);
Expand Down
Loading