Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,43 @@
*/
package io.trino.execution.scheduler;

import com.google.common.collect.ImmutableList;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;

import java.util.List;
import java.util.function.ToIntFunction;

import static java.util.Objects.requireNonNull;

public abstract class BucketNodeMap
public final class BucketNodeMap
{
private final List<InternalNode> bucketToNode;
private final ToIntFunction<Split> splitToBucket;

protected BucketNodeMap(ToIntFunction<Split> splitToBucket)
public BucketNodeMap(ToIntFunction<Split> splitToBucket, List<InternalNode> bucketToNode)
{
this.splitToBucket = requireNonNull(splitToBucket, "splitToBucket is null");
this.bucketToNode = ImmutableList.copyOf(requireNonNull(bucketToNode, "bucketToNode is null"));
}

public abstract int getBucketCount();

public abstract InternalNode getAssignedNode(int bucketedId);
public int getBucketCount()
{
return bucketToNode.size();
}

public abstract boolean isDynamic();
public int getBucket(Split split)
{
return splitToBucket.applyAsInt(split);
}

public final InternalNode getAssignedNode(Split split)
public InternalNode getAssignedNode(int bucketId)
{
return getAssignedNode(splitToBucket.applyAsInt(split));
return bucketToNode.get(bucketId);
}

public final int getBucket(Split split)
public InternalNode getAssignedNode(Split split)
{
return splitToBucket.applyAsInt(split);
return getAssignedNode(getBucket(split));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1438,8 +1438,7 @@ public void stateChanged(QueryState newState)
List<InternalNode> stageNodeList;
if (fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE)) {
// no remote source
bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle, false);

bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle);
stageNodeList = new ArrayList<>(nodeScheduler.createNodeSelector(session, catalogHandle).allNodes());
Collections.shuffle(stageNodeList);
}
Expand Down Expand Up @@ -1892,30 +1891,21 @@ private static BucketToPartition createBucketToPartitionMap(
return new BucketToPartition(Optional.of(IntStream.range(0, partitionCount).toArray()), Optional.empty());
}
if (partitioningHandle.getCatalogHandle().isPresent()) {
BucketNodeMap bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle, true);
BucketNodeMap bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle);
int bucketCount = bucketNodeMap.getBucketCount();
int[] bucketToPartition = new int[bucketCount];
if (bucketNodeMap.isDynamic()) {
int nextPartitionId = 0;
for (int bucket = 0; bucket < bucketCount; bucket++) {
bucketToPartition[bucket] = nextPartitionId % partitionCount;
// make sure all buckets mapped to the same node map to the same partition, such that locality requirements are respected in scheduling
Map<InternalNode, Integer> nodeToPartition = new HashMap<>();
int nextPartitionId = 0;
for (int bucket = 0; bucket < bucketCount; bucket++) {
InternalNode node = bucketNodeMap.getAssignedNode(bucket);
Integer partitionId = nodeToPartition.get(node);
if (partitionId == null) {
partitionId = nextPartitionId;
nextPartitionId++;
nodeToPartition.put(node, partitionId);
}
}
else {
// make sure all buckets mapped to the same node map to the same partition, such that locality requirements are respected in scheduling
Map<InternalNode, Integer> nodeToPartition = new HashMap<>();
int nextPartitionId = 0;
for (int bucket = 0; bucket < bucketCount; bucket++) {
InternalNode node = bucketNodeMap.getAssignedNode(bucket);
Integer partitionId = nodeToPartition.get(node);
if (partitionId == null) {
partitionId = nextPartitionId;
nextPartitionId++;
nodeToPartition.put(node, partitionId);
}
bucketToPartition[bucket] = partitionId;
}
bucketToPartition[bucket] = partitionId;
}
return new BucketToPartition(Optional.of(bucketToPartition), Optional.of(bucketNodeMap));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ public synchronized ListenableFuture<List<TaskDescriptor>> getMoreTasks()
int bucket = bucketNodeMap.getBucket(split);
int partition = getPartitionForBucket(bucket);

if (!bucketNodeMap.isDynamic()) {
{
HostAddress requiredAddress = bucketNodeMap.getAssignedNode(split).getHostAndPort();
Set<HostAddress> existingRequirement = partitionToNodeMap.get(partition);
if (existingRequirement.isEmpty()) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableList;
import io.trino.execution.scheduler.BucketNodeMap;
import io.trino.execution.scheduler.FixedBucketNodeMap;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;

Expand Down Expand Up @@ -76,6 +75,6 @@ public BucketNodeMap asBucketNodeMap()
for (int partition : bucketToPartition) {
bucketToNode.add(partitionToNode.get(partition));
}
return new FixedBucketNodeMap(splitToBucket, bucketToNode.build());
return new BucketNodeMap(splitToBucket, bucketToNode.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import io.trino.connector.CatalogHandle;
import io.trino.connector.CatalogServiceProvider;
import io.trino.execution.scheduler.BucketNodeMap;
import io.trino.execution.scheduler.FixedBucketNodeMap;
import io.trino.execution.scheduler.NodeScheduler;
import io.trino.execution.scheduler.NodeSelector;
import io.trino.execution.scheduler.group.DynamicBucketNodeMap;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
import io.trino.operator.BucketPartitionFunction;
Expand Down Expand Up @@ -227,24 +225,18 @@ private NodePartitionMap systemNodePartitionMap(Session session, PartitioningHan
});
}

public BucketNodeMap getBucketNodeMap(Session session, PartitioningHandle partitioningHandle, boolean preferDynamic)
public BucketNodeMap getBucketNodeMap(Session session, PartitioningHandle partitioningHandle)
{
Optional<ConnectorBucketNodeMap> bucketNodeMap = getConnectorBucketNodeMap(session, partitioningHandle);

ToIntFunction<Split> splitToBucket = getSplitToBucket(session, partitioningHandle);
if (bucketNodeMap.map(ConnectorBucketNodeMap::hasFixedMapping).orElse(false)) {
return new FixedBucketNodeMap(splitToBucket, getFixedMapping(bucketNodeMap.get()));
}

if (preferDynamic) {
int bucketCount = bucketNodeMap.map(ConnectorBucketNodeMap::getBucketCount)
.orElseGet(() -> getNodeCount(session, partitioningHandle));
return new DynamicBucketNodeMap(splitToBucket, bucketCount);
return new BucketNodeMap(splitToBucket, getFixedMapping(bucketNodeMap.get()));
}

List<InternalNode> nodes = getAllNodes(session, requiredCatalogHandle(partitioningHandle));
int bucketCount = bucketNodeMap.map(ConnectorBucketNodeMap::getBucketCount).orElseGet(nodes::size);
return new FixedBucketNodeMap(splitToBucket, createArbitraryBucketToNode(nodes, bucketCount));
return new BucketNodeMap(splitToBucket, createArbitraryBucketToNode(nodes, bucketCount));
}

public int getNodeCount(Session session, PartitioningHandle partitioningHandle)
Expand Down
Loading