diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplit.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplit.java index bc2ac0403ce01..d1ecbc4726263 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplit.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveSplit.java @@ -281,6 +281,8 @@ public Map getInfoMap() .put("partitionName", partitionName) .put("s3SelectPushdownEnabled", Boolean.toString(s3SelectPushdownEnabled)) .put("cacheQuotaRequirement", cacheQuotaRequirement.toString()) + .put("readBucketNumber", readBucketNumber.toString()) + .put("tableBucketNumber", tableBucketNumber.toString()) .build(); } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java index 68b13a5cf3e6d..8914418586069 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveIntegrationSmokeTest.java @@ -4331,6 +4331,33 @@ public static Consumer assertRemoteExchangesCount(int expectedRemoteExchan }; } + @Test + public void testGroupedJoinWithUngroupedSemiJoin() + { + Session groupedExecutionSession = Session.builder(getSession()) + .setSystemProperty(COLOCATED_JOIN, "true") + .setSystemProperty(GROUPED_EXECUTION, "true") + .setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "1") + .setSystemProperty(JOIN_DISTRIBUTION_TYPE, "AUTOMATIC") + .build(); + try { + assertUpdate("CREATE TABLE big_bucketed_table \n" + + "WITH (bucket_count = 16, bucketed_by = ARRAY['key1']) AS\n" + + "SELECT orderkey key1 FROM orders", + 15000); + assertUpdate("CREATE TABLE small_unbucketed_table AS\n" + + "SELECT nationkey key2 FROM nation", + 25); + assertQuery(groupedExecutionSession, + "SELECT count(*) from big_bucketed_table t1 JOIN (SELECT key1 FROM big_bucketed_table where key1 IN (SELECT key2 from small_unbucketed_table)) t2 on t1.key1 = t2.key1 group by t1.key1", + "SELECT count(*) from orders where orderkey < 25 group by orderkey"); + } + finally { + assertUpdate("DROP TABLE IF EXISTS big_bucketed_table"); + assertUpdate("DROP TABLE IF EXISTS small_unbucketed_table"); + } + } + @Test public void testRcTextCharDecoding() { diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java index 7ca6b9308e871..215e2e98d5e40 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/FixedSourcePartitionedScheduler.java @@ -142,11 +142,9 @@ public FixedSourcePartitionedScheduler( else { LifespanScheduler lifespanScheduler; if (bucketNodeMap.isDynamic()) { - // Callee of the constructor guarantees dynamic bucket node map will only be - // used when the stage has no remote source. - // - // When the stage has no remote source, any scan is grouped execution guarantees - // all scan is grouped execution. + // Caller of the constructor guarantees dynamic bucket node map will only be + // used when the stage has no non-replicated remote sources and all scans use grouped + // execution. lifespanScheduler = new DynamicLifespanScheduler(bucketNodeMap, nodes, partitionHandles, concurrentLifespansPerTask); } else { diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeScheduler.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeScheduler.java index 53282484172c4..b116f2c593bfb 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeScheduler.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/NodeScheduler.java @@ -30,6 +30,7 @@ import com.facebook.presto.spi.ConnectorId; import com.facebook.presto.spi.HostAddress; import com.facebook.presto.spi.Node; +import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.SplitContext; import com.facebook.presto.spi.SplitWeight; import com.facebook.presto.ttl.nodettlfetchermanagers.NodeTtlFetcherManager; @@ -67,6 +68,7 @@ import static com.facebook.presto.execution.scheduler.NodeSelectionHashStrategy.CONSISTENT_HASHING; import static com.facebook.presto.metadata.InternalNode.NodeStatus.ALIVE; import static com.facebook.presto.spi.NodeState.ACTIVE; +import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Suppliers.memoizeWithExpiration; import static com.google.common.collect.ImmutableList.toImmutableList; @@ -75,6 +77,7 @@ import static java.lang.Math.addExact; import static java.lang.Math.ceil; import static java.lang.Math.min; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -411,7 +414,11 @@ public static SplitPlacementResult selectDistributionNodes( Set blockedNodes = new HashSet<>(); for (Split split : splits) { // node placement is forced by the bucket to node map - InternalNode node = bucketNodeMap.getAssignedNode(split).get(); + Optional optionalNode = bucketNodeMap.getAssignedNode(split); + if (!optionalNode.isPresent()) { + throw new PrestoException(NO_NODES_AVAILABLE, format("No assignment for split in bucketNodeMap. Split Info: %s", split.getConnectorSplit().getInfoMap())); + } + InternalNode node = optionalNode.get(); boolean isCacheable = bucketNodeMap.isSplitCacheable(split); SplitWeight splitWeight = split.getSplitWeight(); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java index 060379cd7a4a7..39fb36a29efe7 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/scheduler/SectionExecutionFactory.java @@ -361,9 +361,10 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) { } else { // cannot use dynamic lifespan schedule - verify(!plan.getFragment().getStageExecutionDescriptor().isDynamicLifespanSchedule()); + verify(!plan.getFragment().getStageExecutionDescriptor().isDynamicLifespanSchedule(), + "Stage was planned with DYNAMIC_LIFESPAN_SCHEDULE_GROUPED_EXECUTION, but is not eligible."); - // remote source requires nodePartitionMap + // Partitioned remote source requires nodePartitionMap NodePartitionMap nodePartitionMap = partitioningCache.apply(plan.getFragment().getPartitioning()); if (groupedExecutionForStage) { checkState(connectorPartitionHandles.size() == nodePartitionMap.getBucketToPartition().length); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java index 2c14807b7d003..e3f39b7f98d80 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java @@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -149,7 +150,8 @@ private static SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan, PlanFragment fragment = subPlan.getFragment(); GroupedExecutionTagger.GroupedExecutionProperties properties = fragment.getRoot().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager), null); if (properties.isSubTreeUseful()) { - boolean preferDynamic = fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE); + boolean preferDynamic = fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE) + && new HashSet<>(properties.getCapableTableScanNodes()).containsAll(fragment.getTableScanSchedulingOrder()); BucketNodeMap bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, fragment.getPartitioning(), preferDynamic); if (bucketNodeMap.isDynamic()) { /*