diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/BucketNodeMap.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/BucketNodeMap.java index a5c0be6dbdd0..e6c618f642af 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/BucketNodeMap.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/BucketNodeMap.java @@ -13,35 +13,43 @@ */ package io.trino.execution.scheduler; +import com.google.common.collect.ImmutableList; import io.trino.metadata.InternalNode; import io.trino.metadata.Split; +import java.util.List; import java.util.function.ToIntFunction; import static java.util.Objects.requireNonNull; -public abstract class BucketNodeMap +public final class BucketNodeMap { + private final List bucketToNode; private final ToIntFunction splitToBucket; - protected BucketNodeMap(ToIntFunction splitToBucket) + public BucketNodeMap(ToIntFunction splitToBucket, List bucketToNode) { this.splitToBucket = requireNonNull(splitToBucket, "splitToBucket is null"); + this.bucketToNode = ImmutableList.copyOf(requireNonNull(bucketToNode, "bucketToNode is null")); } - public abstract int getBucketCount(); - - public abstract InternalNode getAssignedNode(int bucketedId); + public int getBucketCount() + { + return bucketToNode.size(); + } - public abstract boolean isDynamic(); + public int getBucket(Split split) + { + return splitToBucket.applyAsInt(split); + } - public final InternalNode getAssignedNode(Split split) + public InternalNode getAssignedNode(int bucketId) { - return getAssignedNode(splitToBucket.applyAsInt(split)); + return bucketToNode.get(bucketId); } - public final int getBucket(Split split) + public InternalNode getAssignedNode(Split split) { - return splitToBucket.applyAsInt(split); + return getAssignedNode(getBucket(split)); } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FixedBucketNodeMap.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FixedBucketNodeMap.java deleted file mode 100644 index 21b5e9190aae..000000000000 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FixedBucketNodeMap.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.execution.scheduler; - -import com.google.common.collect.ImmutableList; -import io.trino.metadata.InternalNode; -import io.trino.metadata.Split; - -import java.util.List; -import java.util.NoSuchElementException; -import java.util.function.ToIntFunction; - -import static java.util.Objects.requireNonNull; - -// the bucket to node mapping is fixed and pre-assigned -public class FixedBucketNodeMap - extends BucketNodeMap -{ - private final List bucketToNode; - - public FixedBucketNodeMap(ToIntFunction splitToBucket, List bucketToNode) - { - super(splitToBucket); - this.bucketToNode = ImmutableList.copyOf(requireNonNull(bucketToNode, "bucketToNode is null")); - } - - @Override - public InternalNode getAssignedNode(int bucketId) - { - InternalNode node = bucketToNode.get(bucketId); - if (node == null) { - throw new NoSuchElementException("No node for bucket: " + bucketId); - } - return node; - } - - @Override - public int getBucketCount() - { - return bucketToNode.size(); - } - - @Override - public boolean isDynamic() - { - return false; - } -} diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java index 5774626a0f6c..7bdc8a093833 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java @@ -1438,8 +1438,7 @@ public void stateChanged(QueryState newState) List stageNodeList; if (fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE)) { // no remote source - bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle, false); - + bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle); stageNodeList = new ArrayList<>(nodeScheduler.createNodeSelector(session, catalogHandle).allNodes()); Collections.shuffle(stageNodeList); } @@ -1892,30 +1891,21 @@ private static BucketToPartition createBucketToPartitionMap( return new BucketToPartition(Optional.of(IntStream.range(0, partitionCount).toArray()), Optional.empty()); } if (partitioningHandle.getCatalogHandle().isPresent()) { - BucketNodeMap bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle, true); + BucketNodeMap bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle); int bucketCount = bucketNodeMap.getBucketCount(); int[] bucketToPartition = new int[bucketCount]; - if (bucketNodeMap.isDynamic()) { - int nextPartitionId = 0; - for (int bucket = 0; bucket < bucketCount; bucket++) { - bucketToPartition[bucket] = nextPartitionId % partitionCount; + // make sure all buckets mapped to the same node map to the same partition, such that locality requirements are respected in scheduling + Map nodeToPartition = new HashMap<>(); + int nextPartitionId = 0; + for (int bucket = 0; bucket < bucketCount; bucket++) { + InternalNode node = bucketNodeMap.getAssignedNode(bucket); + Integer partitionId = nodeToPartition.get(node); + if (partitionId == null) { + partitionId = nextPartitionId; nextPartitionId++; + nodeToPartition.put(node, partitionId); } - } - else { - // make sure all buckets mapped to the same node map to the same partition, such that locality requirements are respected in scheduling - Map nodeToPartition = new HashMap<>(); - int nextPartitionId = 0; - for (int bucket = 0; bucket < bucketCount; bucket++) { - InternalNode node = bucketNodeMap.getAssignedNode(bucket); - Integer partitionId = nodeToPartition.get(node); - if (partitionId == null) { - partitionId = nextPartitionId; - nextPartitionId++; - nodeToPartition.put(node, partitionId); - } - bucketToPartition[bucket] = partitionId; - } + bucketToPartition[bucket] = partitionId; } return new BucketToPartition(Optional.of(bucketToPartition), Optional.of(bucketNodeMap)); } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java index f7078e00679b..cee3758fc26a 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/StageTaskSourceFactory.java @@ -512,7 +512,7 @@ public synchronized ListenableFuture> getMoreTasks() int bucket = bucketNodeMap.getBucket(split); int partition = getPartitionForBucket(bucket); - if (!bucketNodeMap.isDynamic()) { + { HostAddress requiredAddress = bucketNodeMap.getAssignedNode(split).getHostAndPort(); Set existingRequirement = partitionToNodeMap.get(partition); if (existingRequirement.isEmpty()) { diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/group/DynamicBucketNodeMap.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/group/DynamicBucketNodeMap.java deleted file mode 100644 index e6d377e64085..000000000000 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/group/DynamicBucketNodeMap.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.execution.scheduler.group; - -import io.trino.execution.scheduler.BucketNodeMap; -import io.trino.metadata.InternalNode; -import io.trino.metadata.Split; - -import java.util.function.ToIntFunction; - -import static com.google.common.base.Preconditions.checkArgument; - -public class DynamicBucketNodeMap - extends BucketNodeMap -{ - private final int bucketCount; - - public DynamicBucketNodeMap(ToIntFunction splitToBucket, int bucketCount) - { - super(splitToBucket); - checkArgument(bucketCount > 0, "bucketCount must be positive"); - this.bucketCount = bucketCount; - } - - @Override - public InternalNode getAssignedNode(int bucketedId) - { - throw new UnsupportedOperationException("DynamicBucketNodeMap does not support assigned nodes"); - } - - @Override - public int getBucketCount() - { - return bucketCount; - } - - @Override - public boolean isDynamic() - { - return true; - } -} diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/NodePartitionMap.java b/core/trino-main/src/main/java/io/trino/sql/planner/NodePartitionMap.java index de14864128e6..c4faa6dfe1e3 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/NodePartitionMap.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/NodePartitionMap.java @@ -15,7 +15,6 @@ import com.google.common.collect.ImmutableList; import io.trino.execution.scheduler.BucketNodeMap; -import io.trino.execution.scheduler.FixedBucketNodeMap; import io.trino.metadata.InternalNode; import io.trino.metadata.Split; @@ -76,6 +75,6 @@ public BucketNodeMap asBucketNodeMap() for (int partition : bucketToPartition) { bucketToNode.add(partitionToNode.get(partition)); } - return new FixedBucketNodeMap(splitToBucket, bucketToNode.build()); + return new BucketNodeMap(splitToBucket, bucketToNode.build()); } } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/NodePartitioningManager.java b/core/trino-main/src/main/java/io/trino/sql/planner/NodePartitioningManager.java index 5bce806abe20..e0d8343b1140 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/NodePartitioningManager.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/NodePartitioningManager.java @@ -20,10 +20,8 @@ import io.trino.connector.CatalogHandle; import io.trino.connector.CatalogServiceProvider; import io.trino.execution.scheduler.BucketNodeMap; -import io.trino.execution.scheduler.FixedBucketNodeMap; import io.trino.execution.scheduler.NodeScheduler; import io.trino.execution.scheduler.NodeSelector; -import io.trino.execution.scheduler.group.DynamicBucketNodeMap; import io.trino.metadata.InternalNode; import io.trino.metadata.Split; import io.trino.operator.BucketPartitionFunction; @@ -227,24 +225,18 @@ private NodePartitionMap systemNodePartitionMap(Session session, PartitioningHan }); } - public BucketNodeMap getBucketNodeMap(Session session, PartitioningHandle partitioningHandle, boolean preferDynamic) + public BucketNodeMap getBucketNodeMap(Session session, PartitioningHandle partitioningHandle) { Optional bucketNodeMap = getConnectorBucketNodeMap(session, partitioningHandle); ToIntFunction splitToBucket = getSplitToBucket(session, partitioningHandle); if (bucketNodeMap.map(ConnectorBucketNodeMap::hasFixedMapping).orElse(false)) { - return new FixedBucketNodeMap(splitToBucket, getFixedMapping(bucketNodeMap.get())); - } - - if (preferDynamic) { - int bucketCount = bucketNodeMap.map(ConnectorBucketNodeMap::getBucketCount) - .orElseGet(() -> getNodeCount(session, partitioningHandle)); - return new DynamicBucketNodeMap(splitToBucket, bucketCount); + return new BucketNodeMap(splitToBucket, getFixedMapping(bucketNodeMap.get())); } List nodes = getAllNodes(session, requiredCatalogHandle(partitioningHandle)); int bucketCount = bucketNodeMap.map(ConnectorBucketNodeMap::getBucketCount).orElseGet(nodes::size); - return new FixedBucketNodeMap(splitToBucket, createArbitraryBucketToNode(nodes, bucketCount)); + return new BucketNodeMap(splitToBucket, createArbitraryBucketToNode(nodes, bucketCount)); } public int getNodeCount(Session session, PartitioningHandle partitioningHandle) diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java index ed7950220191..06d3d8f4219f 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestStageTaskSourceFactory.java @@ -25,13 +25,14 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import io.airlift.units.DataSize; +import io.trino.client.NodeVersion; import io.trino.execution.TableExecuteContextManager; import io.trino.execution.scheduler.StageTaskSourceFactory.ArbitraryDistributionTaskSource; import io.trino.execution.scheduler.StageTaskSourceFactory.HashDistributionTaskSource; import io.trino.execution.scheduler.StageTaskSourceFactory.SingleDistributionTaskSource; import io.trino.execution.scheduler.StageTaskSourceFactory.SourceDistributionTaskSource; import io.trino.execution.scheduler.TestingExchange.TestingExchangeSourceHandle; -import io.trino.execution.scheduler.group.DynamicBucketNodeMap; +import io.trino.metadata.InternalNode; import io.trino.metadata.Split; import io.trino.spi.HostAddress; import io.trino.spi.QueryId; @@ -46,6 +47,7 @@ import org.openjdk.jol.info.ClassLayout; import org.testng.annotations.Test; +import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.IdentityHashMap; @@ -69,6 +71,7 @@ import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.trino.spi.exchange.ExchangeId.createRandomExchangeId; import static io.trino.testing.TestingHandles.TEST_CATALOG_HANDLE; +import static java.util.Collections.nCopies; import static java.util.Objects.requireNonNull; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.guava.api.Assertions.assertThat; @@ -78,6 +81,7 @@ public class TestStageTaskSourceFactory { + private static final HostAddress NODE_ADDRESS = HostAddress.fromString("testaddress"); private static final PlanNodeId PLAN_NODE_1 = new PlanNodeId("planNode1"); private static final PlanNodeId PLAN_NODE_2 = new PlanNodeId("planNode2"); private static final PlanNodeId PLAN_NODE_3 = new PlanNodeId("planNode3"); @@ -335,16 +339,28 @@ PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), DataSize.of(0, BYTE)); assertFalse(taskSource.isFinished()); assertEquals(getFutureValue(taskSource.getMoreTasks()), ImmutableList.of( - new TaskDescriptor(0, ImmutableListMultimap.of(), ImmutableListMultimap.of( - PLAN_NODE_1, new TestingExchangeSourceHandle(0, 1), - PLAN_NODE_2, new TestingExchangeSourceHandle(0, 1), - PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), - new TaskDescriptor(1, ImmutableListMultimap.of(), ImmutableListMultimap.of( - PLAN_NODE_1, new TestingExchangeSourceHandle(1, 1), - PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), - new TaskDescriptor(2, ImmutableListMultimap.of(), ImmutableListMultimap.of( - PLAN_NODE_2, new TestingExchangeSourceHandle(3, 1), - PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))))); + new TaskDescriptor( + 0, + ImmutableListMultimap.of(), + ImmutableListMultimap.of( + PLAN_NODE_1, new TestingExchangeSourceHandle(0, 1), + PLAN_NODE_2, new TestingExchangeSourceHandle(0, 1), + PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), + new TaskDescriptor( + 1, + ImmutableListMultimap.of(), + ImmutableListMultimap.of( + PLAN_NODE_1, new TestingExchangeSourceHandle(1, 1), + PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), + new TaskDescriptor( + 2, + ImmutableListMultimap.of(), + ImmutableListMultimap.of( + PLAN_NODE_2, new TestingExchangeSourceHandle(3, 1), + PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))))); assertTrue(taskSource.isFinished()); Split bucketedSplit1 = createBucketedSplit(0, 0); @@ -371,25 +387,29 @@ PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), ImmutableListMultimap.of( PLAN_NODE_4, bucketedSplit1), ImmutableListMultimap.of( - PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), + PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))), new TaskDescriptor( 1, ImmutableListMultimap.of( PLAN_NODE_5, bucketedSplit4), ImmutableListMultimap.of( - PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), + PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))), new TaskDescriptor( 2, ImmutableListMultimap.of( PLAN_NODE_4, bucketedSplit2), ImmutableListMultimap.of( - PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), + PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))), new TaskDescriptor( 3, ImmutableListMultimap.of( PLAN_NODE_4, bucketedSplit3), ImmutableListMultimap.of( - PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))))); + PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))))); assertTrue(taskSource.isFinished()); taskSource = createHashDistributionTaskSource( @@ -417,27 +437,31 @@ PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), ImmutableListMultimap.of( PLAN_NODE_1, new TestingExchangeSourceHandle(0, 1), PLAN_NODE_2, new TestingExchangeSourceHandle(0, 1), - PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), + PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))), new TaskDescriptor( 1, ImmutableListMultimap.of( PLAN_NODE_5, bucketedSplit4), ImmutableListMultimap.of( PLAN_NODE_1, new TestingExchangeSourceHandle(1, 1), - PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), + PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))), new TaskDescriptor( 2, ImmutableListMultimap.of( PLAN_NODE_4, bucketedSplit2), ImmutableListMultimap.of( - PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), + PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))), new TaskDescriptor( 3, ImmutableListMultimap.of( PLAN_NODE_4, bucketedSplit3), ImmutableListMultimap.of( PLAN_NODE_2, new TestingExchangeSourceHandle(3, 1), - PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))))); + PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))))); assertTrue(taskSource.isFinished()); taskSource = createHashDistributionTaskSource( @@ -464,7 +488,8 @@ PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), ImmutableListMultimap.of( PLAN_NODE_1, new TestingExchangeSourceHandle(0, 1), PLAN_NODE_2, new TestingExchangeSourceHandle(0, 1), - PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), + PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))), new TaskDescriptor( 1, ImmutableListMultimap.of( @@ -472,7 +497,8 @@ PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Option PLAN_NODE_5, bucketedSplit4), ImmutableListMultimap.of( PLAN_NODE_1, new TestingExchangeSourceHandle(1, 1), - PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))))); + PLAN_NODE_3, new TestingExchangeSourceHandle(0, 1)), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))))); assertTrue(taskSource.isFinished()); // join based on split target split weight @@ -505,7 +531,7 @@ PLAN_NODE_1, new TestingExchangeSourceHandle(0, 1), PLAN_NODE_1, new TestingExchangeSourceHandle(1, 1), PLAN_NODE_2, new TestingExchangeSourceHandle(1, 1), PLAN_NODE_3, new TestingExchangeSourceHandle(17, 1)), - new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))), new TaskDescriptor( 1, ImmutableListMultimap.of( @@ -515,7 +541,7 @@ PLAN_NODE_3, new TestingExchangeSourceHandle(17, 1)), PLAN_NODE_2, new TestingExchangeSourceHandle(2, 1), PLAN_NODE_2, new TestingExchangeSourceHandle(3, 1), PLAN_NODE_3, new TestingExchangeSourceHandle(17, 1)), - new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))))); + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))))); assertTrue(taskSource.isFinished()); // join based on target exchange size @@ -548,7 +574,7 @@ PLAN_NODE_1, new TestingExchangeSourceHandle(0, 20), PLAN_NODE_1, new TestingExchangeSourceHandle(1, 30), PLAN_NODE_2, new TestingExchangeSourceHandle(1, 20), PLAN_NODE_3, new TestingExchangeSourceHandle(17, 1)), - new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))), new TaskDescriptor( 1, ImmutableListMultimap.of( @@ -556,7 +582,7 @@ PLAN_NODE_3, new TestingExchangeSourceHandle(17, 1)), ImmutableListMultimap.of( PLAN_NODE_2, new TestingExchangeSourceHandle(2, 99), PLAN_NODE_3, new TestingExchangeSourceHandle(17, 1)), - new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))), + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))), new TaskDescriptor( 2, ImmutableListMultimap.of( @@ -564,7 +590,7 @@ PLAN_NODE_3, new TestingExchangeSourceHandle(17, 1)), ImmutableListMultimap.of( PLAN_NODE_2, new TestingExchangeSourceHandle(3, 30), PLAN_NODE_3, new TestingExchangeSourceHandle(17, 1)), - new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(), DataSize.of(4, GIGABYTE))))); + new NodeRequirements(Optional.of(TEST_CATALOG_HANDLE), ImmutableSet.of(NODE_ADDRESS), DataSize.of(4, GIGABYTE))))); assertTrue(taskSource.isFinished()); } @@ -968,10 +994,9 @@ private static Optional> addressesList(String... addresses) private static BucketNodeMap getTestingBucketNodeMap(int bucketCount) { - return new DynamicBucketNodeMap(split -> { - TestingConnectorSplit testingConnectorSplit = (TestingConnectorSplit) split.getConnectorSplit(); - return testingConnectorSplit.getBucket().getAsInt(); - }, bucketCount); + return new BucketNodeMap( + split -> ((TestingConnectorSplit) split.getConnectorSplit()).getBucket().orElseThrow(), + nCopies(bucketCount, new InternalNode("local", URI.create("local://" + NODE_ADDRESS), NodeVersion.UNKNOWN, true))); } private static class TestingConnectorSplit