From f45ac5e3dcbea5b212c7e7fc68f1c3b005c0d34e Mon Sep 17 00:00:00 2001 From: Ke Wang Date: Wed, 18 Nov 2020 17:27:06 -0800 Subject: [PATCH] Fix split cacheable setting for affinity scheduling As of now, when we schedule a scan for bucketed table which then joins with a remote, we don't set the cacheable to true even if we enable affinity scheduling --- .../com/facebook/presto/sql/planner/NodePartitionMap.java | 7 +++++-- .../presto/sql/planner/NodePartitioningManager.java | 8 +++++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitionMap.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitionMap.java index fc2b94fce83f0..b8a6fc1637b79 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitionMap.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitionMap.java @@ -40,19 +40,22 @@ public class NodePartitionMap private final List partitionToNode; private final int[] bucketToPartition; private final ToIntFunction splitToBucket; + private final boolean cacheable; public NodePartitionMap(List partitionToNode, ToIntFunction splitToBucket) { this.partitionToNode = ImmutableList.copyOf(requireNonNull(partitionToNode, "partitionToNode is null")); this.bucketToPartition = IntStream.range(0, partitionToNode.size()).toArray(); this.splitToBucket = requireNonNull(splitToBucket, "splitToBucket is null"); + this.cacheable = false; } - public NodePartitionMap(List partitionToNode, int[] bucketToPartition, ToIntFunction splitToBucket) + public NodePartitionMap(List partitionToNode, int[] bucketToPartition, ToIntFunction splitToBucket, boolean cacheable) { this.bucketToPartition = requireNonNull(bucketToPartition, "bucketToPartition is null"); this.partitionToNode = ImmutableList.copyOf(requireNonNull(partitionToNode, "partitionToNode is null")); this.splitToBucket = requireNonNull(splitToBucket, "splitToBucket is null"); + this.cacheable = cacheable; } public List getPartitionToNode() @@ -78,6 +81,6 @@ public BucketNodeMap asBucketNodeMap() for (int partition : bucketToPartition) { bucketToNode.add(partitionToNode.get(partition)); } - return new FixedBucketNodeMap(splitToBucket, bucketToNode.build(), false); + return new FixedBucketNodeMap(splitToBucket, bucketToNode.build(), cacheable); } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java index a5d651b7eade2..1d93a12afb3a8 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/NodePartitioningManager.java @@ -127,15 +127,21 @@ public NodePartitionMap getNodePartitioningMap(Session session, PartitioningHand List bucketToNode; NodeSelectionStrategy nodeSelectionStrategy = connectorBucketNodeMap.getNodeSelectionStrategy(); + boolean cacheable; switch (nodeSelectionStrategy) { case HARD_AFFINITY: + bucketToNode = getFixedMapping(connectorBucketNodeMap); + cacheable = false; + break; case SOFT_AFFINITY: bucketToNode = getFixedMapping(connectorBucketNodeMap); + cacheable = true; break; case NO_PREFERENCE: bucketToNode = createArbitraryBucketToNode( nodeScheduler.createNodeSelector(connectorId).selectRandomNodes(getMaxTasksPerStage(session)), connectorBucketNodeMap.getBucketCount()); + cacheable = false; break; default: throw new PrestoException(NODE_SELECTION_NOT_SUPPORTED, format("Unsupported node selection strategy %s", nodeSelectionStrategy)); @@ -158,7 +164,7 @@ public NodePartitionMap getNodePartitioningMap(Session session, PartitioningHand .mapToObj(partitionId -> nodeToPartition.inverse().get(partitionId)) .collect(toImmutableList()); - return new NodePartitionMap(partitionToNode, bucketToPartition, getSplitToBucket(session, partitioningHandle)); + return new NodePartitionMap(partitionToNode, bucketToPartition, getSplitToBucket(session, partitioningHandle), cacheable); } public BucketNodeMap getBucketNodeMap(Session session, PartitioningHandle partitioningHandle, boolean preferDynamic)