Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ public Map<String, String> getInfoMap()
.put("partitionName", partitionName)
.put("s3SelectPushdownEnabled", Boolean.toString(s3SelectPushdownEnabled))
.put("cacheQuotaRequirement", cacheQuotaRequirement.toString())
.put("readBucketNumber", readBucketNumber.toString())
.put("tableBucketNumber", tableBucketNumber.toString())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4331,6 +4331,33 @@ public static Consumer<Plan> assertRemoteExchangesCount(int expectedRemoteExchan
};
}

@Test
public void testGroupedJoinWithUngroupedSemiJoin()
{
Session groupedExecutionSession = Session.builder(getSession())
.setSystemProperty(COLOCATED_JOIN, "true")
.setSystemProperty(GROUPED_EXECUTION, "true")
.setSystemProperty(CONCURRENT_LIFESPANS_PER_NODE, "1")
.setSystemProperty(JOIN_DISTRIBUTION_TYPE, "AUTOMATIC")
.build();
try {
assertUpdate("CREATE TABLE big_bucketed_table \n" +
"WITH (bucket_count = 16, bucketed_by = ARRAY['key1']) AS\n" +
"SELECT orderkey key1 FROM orders",
15000);
assertUpdate("CREATE TABLE small_unbucketed_table AS\n" +
"SELECT nationkey key2 FROM nation",
25);
assertQuery(groupedExecutionSession,
"SELECT count(*) from big_bucketed_table t1 JOIN (SELECT key1 FROM big_bucketed_table where key1 IN (SELECT key2 from small_unbucketed_table)) t2 on t1.key1 = t2.key1 group by t1.key1",
"SELECT count(*) from orders where orderkey < 25 group by orderkey");
}
finally {
assertUpdate("DROP TABLE IF EXISTS big_bucketed_table");
assertUpdate("DROP TABLE IF EXISTS small_unbucketed_table");
}
}

@Test
public void testRcTextCharDecoding()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,9 @@ public FixedSourcePartitionedScheduler(
else {
LifespanScheduler lifespanScheduler;
if (bucketNodeMap.isDynamic()) {
// Callee of the constructor guarantees dynamic bucket node map will only be
// used when the stage has no remote source.
//
// When the stage has no remote source, any scan is grouped execution guarantees
// all scan is grouped execution.
// Caller of the constructor guarantees dynamic bucket node map will only be
// used when the stage has no non-replicated remote sources and all scans use grouped
// execution.
lifespanScheduler = new DynamicLifespanScheduler(bucketNodeMap, nodes, partitionHandles, concurrentLifespansPerTask);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.Node;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SplitContext;
import com.facebook.presto.spi.SplitWeight;
import com.facebook.presto.ttl.nodettlfetchermanagers.NodeTtlFetcherManager;
Expand Down Expand Up @@ -67,6 +68,7 @@
import static com.facebook.presto.execution.scheduler.NodeSelectionHashStrategy.CONSISTENT_HASHING;
import static com.facebook.presto.metadata.InternalNode.NodeStatus.ALIVE;
import static com.facebook.presto.spi.NodeState.ACTIVE;
import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Suppliers.memoizeWithExpiration;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -75,6 +77,7 @@
import static java.lang.Math.addExact;
import static java.lang.Math.ceil;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -411,7 +414,11 @@ public static SplitPlacementResult selectDistributionNodes(
Set<InternalNode> blockedNodes = new HashSet<>();
for (Split split : splits) {
// node placement is forced by the bucket to node map
InternalNode node = bucketNodeMap.getAssignedNode(split).get();
Optional<InternalNode> optionalNode = bucketNodeMap.getAssignedNode(split);
if (!optionalNode.isPresent()) {
throw new PrestoException(NO_NODES_AVAILABLE, format("No assignment for split in bucketNodeMap. Split Info: %s", split.getConnectorSplit().getInfoMap()));
}
InternalNode node = optionalNode.get();
boolean isCacheable = bucketNodeMap.isSplitCacheable(split);
SplitWeight splitWeight = split.getSplitWeight();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,10 @@ else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
}
else {
// cannot use dynamic lifespan schedule
verify(!plan.getFragment().getStageExecutionDescriptor().isDynamicLifespanSchedule());
verify(!plan.getFragment().getStageExecutionDescriptor().isDynamicLifespanSchedule(),
"Stage was planned with DYNAMIC_LIFESPAN_SCHEDULE_GROUPED_EXECUTION, but is not eligible.");

// remote source requires nodePartitionMap
// Partitioned remote source requires nodePartitionMap
NodePartitionMap nodePartitionMap = partitioningCache.apply(plan.getFragment().getPartitioning());
if (groupedExecutionForStage) {
checkState(connectorPartitionHandles.size() == nodePartitionMap.getBucketToPartition().length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -149,7 +150,8 @@ private static SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan,
PlanFragment fragment = subPlan.getFragment();
GroupedExecutionTagger.GroupedExecutionProperties properties = fragment.getRoot().accept(new GroupedExecutionTagger(session, metadata, nodePartitioningManager), null);
if (properties.isSubTreeUseful()) {
boolean preferDynamic = fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE);
boolean preferDynamic = fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE)
&& new HashSet<>(properties.getCapableTableScanNodes()).containsAll(fragment.getTableScanSchedulingOrder());
BucketNodeMap bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, fragment.getPartitioning(), preferDynamic);
if (bucketNodeMap.isDynamic()) {
/*
Expand Down