Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.DistinctLimitNode;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.LimitNode;
import com.facebook.presto.spi.plan.MarkDistinctNode;
Expand Down Expand Up @@ -488,10 +489,7 @@ public PinotQueryGeneratorContext visitAggregation(AggregationNode node, PinotQu
// In Pql mode, the generated pql is `SELECT count(*) FROM myTable GROUP BY A, B`;
// In Sql mode, the generated sql is still `SELECT A, B FROM myTable GROUP BY A, B`.
if (!useSqlSyntax && groupByExists && aggregations == 0) {
VariableReferenceExpression hidden = new VariableReferenceExpression(UUID.randomUUID().toString(), BigintType.BIGINT);
newSelections.put(hidden, new Selection("count(*)", DERIVED));
outputs.add(hidden);
hiddenColumnSet.add(hidden);
setHiddenField(newSelections, outputs, hiddenColumnSet);
aggregations++;
}
return context.withAggregation(newSelections, outputs, groupByColumns, aggregations, hiddenColumnSet);
Expand All @@ -516,5 +514,40 @@ public PinotQueryGeneratorContext visitTopN(TopNNode node, PinotQueryGeneratorCo
checkSupported(node.getStep().equals(TopNNode.Step.SINGLE), "Can only push single logical topn in");
return context.withTopN(getOrderingScheme(node), node.getCount()).withOutputColumns(node.getOutputVariables());
}

@Override
public PinotQueryGeneratorContext visitDistinctLimit(DistinctLimitNode node, PinotQueryGeneratorContext context)
{
context = node.getSource().accept(this, context);
requireNonNull(context, "context is null");
checkSupported(!forbidBrokerQueries, "Cannot push distinctLimit in segment mode");
LinkedHashSet<VariableReferenceExpression> groupByColumns = new LinkedHashSet<>(node.getDistinctVariables());
Comment thread
xiangfu0 marked this conversation as resolved.
Outdated
if (!useSqlSyntax) {
// Handling PQL by adding hidden expression: count(*)
// E.g. `SELECT DISTINCT A, B FROM myTable LIMIT 10`
// In Pql mode, the generated pql is `SELECT count(*) FROM myTable GROUP BY A, B LIMIT 10`.
checkSupported(!context.hasAggregation(), "Aggregation already exists. Pinot doesn't support DistinctLimit with existing Aggregation");
checkSupported(!context.hasGroupBy(), "GroupBy already exists. Pinot doesn't support DistinctLimit with existing GroupBy");
Map<VariableReferenceExpression, Selection> newSelections = new HashMap<>(context.getSelections());
LinkedHashSet<VariableReferenceExpression> newOutputs = new LinkedHashSet<>(groupByColumns);
Set<VariableReferenceExpression> hiddenColumnSet = new HashSet<>();
setHiddenField(newSelections, newOutputs, hiddenColumnSet);
return context.withAggregation(newSelections, newOutputs, groupByColumns, 1, hiddenColumnSet).withLimit(node.getLimit());
}
Comment thread
xiangfu0 marked this conversation as resolved.
Outdated
// Handle SQL by setting the groupBy columns and output columns.
// E.g. `SELECT DISTINCT A, B FROM myTable LIMIT 10`
// In Sql mode, the generated sql is still `SELECT A, B FROM myTable GROUP BY A, B LIMIT 10`.
return context.withDistinctLimit(groupByColumns, node.getLimit()).withOutputColumns(node.getOutputVariables());
}

private void setHiddenField(Map<VariableReferenceExpression, Selection> selections,
LinkedHashSet<VariableReferenceExpression> outputs,
Set<VariableReferenceExpression> hiddenColumnSet)
{
VariableReferenceExpression hidden = new VariableReferenceExpression(UUID.randomUUID().toString(), BigintType.BIGINT);
selections.put(hidden, new Selection("count(*)", DERIVED));
outputs.add(hidden);
hiddenColumnSet.add(hidden);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public PinotQueryGeneratorContext withAggregation(
public PinotQueryGeneratorContext withProject(Map<VariableReferenceExpression, Selection> newSelections, LinkedHashSet<VariableReferenceExpression> newOutputs)
{
if (!useSqlSyntax) {
checkSupported(groupByColumns.isEmpty(), "Pinot doesn't yet support new selections on top of the grouped by data");
checkSupported(!hasGroupBy(), "Pinot doesn't yet support new selections on top of the grouped by data");
}
return new PinotQueryGeneratorContext(
newSelections,
Expand Down Expand Up @@ -278,7 +278,12 @@ private boolean hasLimit()
return limit.isPresent();
}

private boolean hasAggregation()
public boolean hasGroupBy()
{
return !groupByColumns.isEmpty();
}

public boolean hasAggregation()
{
return aggregations > 0;
}
Expand Down Expand Up @@ -316,11 +321,11 @@ public PinotQueryGenerator.GeneratedPinotQuery toPqlQuery(PinotConfig pinotConfi
int nonAggregateShortQueryLimit = PinotSessionProperties.getNonAggregateLimitForBrokerQueries(session);
boolean isQueryShort = hasAggregation() || limit.orElse(Integer.MAX_VALUE) < nonAggregateShortQueryLimit;
boolean forBroker = !PinotSessionProperties.isForbidBrokerQueries(session) && isQueryShort;
if (!pinotConfig.isAllowMultipleAggregations() && aggregations > 1 && !groupByColumns.isEmpty()) {
if (!pinotConfig.isAllowMultipleAggregations() && aggregations > 1 && hasGroupBy()) {
throw new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Multiple aggregates in the presence of group by is forbidden");
}

if (hasLimit() && aggregations > 1 && !groupByColumns.isEmpty()) {
if (hasLimit() && aggregations > 1 && hasGroupBy()) {
throw new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Multiple aggregates in the presence of group by and limit is forbidden");
}

Expand Down Expand Up @@ -352,7 +357,7 @@ public PinotQueryGenerator.GeneratedPinotQuery toPqlQuery(PinotConfig pinotConfi
}
limitKeyWord = "LIMIT";
}
else if (!groupByColumns.isEmpty()) {
else if (hasGroupBy()) {
limitKeyWord = "TOP";
if (limit.isPresent()) {
if (aggregations > 1) {
Expand Down Expand Up @@ -394,7 +399,7 @@ else if (!forBroker) {
query += TIME_BOUNDARY_FILTER_TEMPLATE;
}

if (!groupByColumns.isEmpty()) {
if (hasGroupBy()) {
String groupByExpr = groupByColumns.stream().map(x -> selections.get(x).getDefinition()).collect(Collectors.joining(", "));
query = query + " GROUP BY " + groupByExpr;
}
Expand All @@ -413,7 +418,7 @@ else if (!forBroker) {
public PinotQueryGenerator.GeneratedPinotQuery toSqlQuery(PinotConfig pinotConfig, ConnectorSession session)
{
int nonAggregateShortQueryLimit = PinotSessionProperties.getNonAggregateLimitForBrokerQueries(session);
boolean isQueryShort = (hasAggregation() || !groupByColumns.isEmpty()) || limit.orElse(Integer.MAX_VALUE) < nonAggregateShortQueryLimit;
boolean isQueryShort = (hasAggregation() || hasGroupBy()) || limit.orElse(Integer.MAX_VALUE) < nonAggregateShortQueryLimit;
boolean forBroker = !PinotSessionProperties.isForbidBrokerQueries(session) && isQueryShort;
String expressions = outputs.stream()
.map(o -> selections.get(o).getDefinition())
Expand All @@ -428,15 +433,15 @@ public PinotQueryGenerator.GeneratedPinotQuery toSqlQuery(PinotConfig pinotConfi
// - Aggregation only query limit is ignored.
// - Fail if limit is invalid
int queryLimit = -1;
if (!hasAggregation() && groupByColumns.isEmpty()) {
if (!hasAggregation() && !hasGroupBy()) {
if (!limit.isPresent() && forBroker) {
throw new PinotException(PINOT_QUERY_GENERATOR_FAILURE, Optional.empty(), "Broker non aggregate queries have to have a limit");
}
else {
queryLimit = limit.orElseGet(pinotConfig::getLimitLargeForSegment);
}
}
else if (!groupByColumns.isEmpty()) {
else if (hasGroupBy()) {
if (limit.isPresent()) {
queryLimit = limit.getAsInt();
}
Expand Down Expand Up @@ -553,6 +558,27 @@ public PinotQueryGeneratorContext withVariablesInAggregation(Set<VariableReferen
useSqlSyntax);
}

public PinotQueryGeneratorContext withDistinctLimit(LinkedHashSet<VariableReferenceExpression> newGroupByColumns, long limit)
{
int intLimit = checkForValidLimit(limit);
Comment thread
xiangfu0 marked this conversation as resolved.
Outdated
checkSupported(useSqlSyntax, "DistinctLimit is only supported in SQL mode");
checkSupported(!hasLimit(), "Limit already exists. Pinot doesn't support limit on top of another limit");
checkSupported(!hasGroupBy(), "GroupBy already exists. Pinot doesn't support Distinct on top of another Group By");
Comment thread
xiangfu0 marked this conversation as resolved.
Outdated
checkSupported(!hasAggregation(), "Aggregation already exists. Pinot doesn't support Distinct Limit on top of Aggregation");
return new PinotQueryGeneratorContext(
selections,
outputs,
from,
filter,
aggregations,
newGroupByColumns,
topNColumnOrderingMap,
OptionalInt.of(intLimit),
variablesInAggregation,
hiddenColumnSet,
true);
}

/**
* Where is the selection/projection originated from
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.function.StandardFunctionResolution;
import com.facebook.presto.spi.plan.Assignments;
import com.facebook.presto.spi.plan.DistinctLimitNode;
import com.facebook.presto.spi.plan.FilterNode;
import com.facebook.presto.spi.plan.LimitNode;
import com.facebook.presto.spi.plan.MarkDistinctNode;
Expand Down Expand Up @@ -225,6 +226,11 @@ protected LimitNode limit(PlanBuilder pb, long count, PlanNode source)
return new LimitNode(pb.getIdAllocator().getNextId(), source, count, FINAL);
}

protected DistinctLimitNode distinctLimit(PlanBuilder pb, List<VariableReferenceExpression> distinctVariables, long count, PlanNode source)
{
return new DistinctLimitNode(pb.getIdAllocator().getNextId(), source, count, false, distinctVariables, Optional.empty());
}

protected TopNNode topN(PlanBuilder pb, long count, List<String> orderingColumns, List<Boolean> ascending, PlanNode source)
{
ImmutableList<Ordering> ordering = IntStream.range(0, orderingColumns.size()).boxed().map(i -> new Ordering(variable(orderingColumns.get(i)), ascending.get(i) ? SortOrder.ASC_NULLS_FIRST : SortOrder.DESC_NULLS_FIRST)).collect(toImmutableList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class TestPinotPlanOptimizer
new RowExpressionDeterminismEvaluator(functionMetadataManager),
new FunctionResolution(functionMetadataManager),
functionMetadataManager);
private final PinotTableHandle pinotTable = TestPinotSplitManager.hybridTable;
protected final PinotTableHandle pinotTable = TestPinotSplitManager.hybridTable;
protected final SessionHolder defaultSessionHolder = getDefaultSessionHolder();

public SessionHolder getDefaultSessionHolder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,18 @@
*/
package com.facebook.presto.pinot.query;

import com.facebook.presto.spi.plan.PlanNode;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.planner.iterative.rule.test.PlanBuilder;
import com.facebook.presto.testing.assertions.Assert;
import com.google.common.collect.ImmutableList;
import org.testng.annotations.Test;

import java.util.Optional;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;

public class TestPinotPlanOptimizerSql
extends TestPinotPlanOptimizer
{
Expand All @@ -30,4 +39,44 @@ public void assertUsingSqlSyntax()
{
Assert.assertEquals(defaultSessionHolder.getConnectorSession().getProperty("use_pinot_sql_for_broker_queries", Boolean.class).booleanValue(), true);
}

@Test
public void testDistinctLimitPushdown()
{
PlanBuilder planBuilder = createPlanBuilder(defaultSessionHolder);
PlanNode originalPlan = distinctLimit(
Comment thread
xiangfu0 marked this conversation as resolved.
Outdated
planBuilder,
ImmutableList.of(new VariableReferenceExpression("regionid", BIGINT)),
50L,
tableScan(planBuilder, pinotTable, regionId));
PlanNode optimized = getOptimizedPlan(planBuilder, originalPlan);
assertPlanMatch(
optimized,
PinotTableScanMatcher.match(
pinotTable,
Optional.of("SELECT regionId FROM hybrid GROUP BY regionId LIMIT 50"),
Optional.of(false),
originalPlan.getOutputVariables(),
useSqlSyntax()),
typeProvider);

planBuilder = createPlanBuilder(defaultSessionHolder);
originalPlan = distinctLimit(
planBuilder,
ImmutableList.of(
new VariableReferenceExpression("regionid", BIGINT),
new VariableReferenceExpression("city", VARCHAR)),
50L,
tableScan(planBuilder, pinotTable, regionId, city));
optimized = getOptimizedPlan(planBuilder, originalPlan);
assertPlanMatch(
optimized,
PinotTableScanMatcher.match(
pinotTable,
Optional.of("SELECT regionId, city FROM hybrid GROUP BY regionId, city LIMIT 50"),
Optional.of(false),
originalPlan.getOutputVariables(),
useSqlSyntax()),
typeProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.pinot.PinotTableHandle;
import com.facebook.presto.pinot.TestPinotQueryBase;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.DistinctLimitNode;
import com.facebook.presto.spi.plan.Ordering;
import com.facebook.presto.spi.plan.OrderingScheme;
import com.facebook.presto.spi.plan.PlanNode;
Expand All @@ -41,7 +42,9 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.TimestampType.TIMESTAMP;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -374,4 +377,41 @@ public void testAggregationWithOrderByPushDownInTopN()
TopNNode topN = new TopNNode(planBuilder.getIdAllocator().getNextId(), agg, 50L, new OrderingScheme(ImmutableList.of(new Ordering(variable("city"), SortOrder.DESC_NULLS_FIRST))), TopNNode.Step.FINAL);
testPinotQuery(pinotConfig, topN, "", defaultSessionHolder, ImmutableMap.of());
}

@Test
Comment thread
xiangfu0 marked this conversation as resolved.
Outdated
public void testDistinctLimitPushdown()
{
PlanBuilder planBuilder = createPlanBuilder(defaultSessionHolder);
DistinctLimitNode distinctLimitNode = distinctLimit(
planBuilder,
ImmutableList.of(new VariableReferenceExpression("regionid", BIGINT)),
50L,
tableScan(planBuilder, pinotTable, regionId));
testPinotQuery(
pinotConfig,
distinctLimitNode,
String.format("SELECT %s FROM realtimeOnly GROUP BY regionId %s 50", getExpectedDistinctOutput("regionId"), getGroupByLimitKey()),
defaultSessionHolder,
ImmutableMap.of());

planBuilder = createPlanBuilder(defaultSessionHolder);
distinctLimitNode = distinctLimit(
planBuilder,
ImmutableList.of(
new VariableReferenceExpression("regionid", BIGINT),
new VariableReferenceExpression("city", VARCHAR)),
50L,
tableScan(planBuilder, pinotTable, regionId, city));
testPinotQuery(
pinotConfig,
distinctLimitNode,
String.format("SELECT %s FROM realtimeOnly GROUP BY regionId, city %s 50", getExpectedDistinctOutput("regionId, city"), getGroupByLimitKey()),
defaultSessionHolder,
ImmutableMap.of());
}

protected String getExpectedDistinctOutput(String groupKeys)
{
return "count(*)";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,10 @@ public void testDistinctSelection()
defaultSessionHolder,
ImmutableMap.of());
}

@Override
protected String getExpectedDistinctOutput(String groupKeys)
{
return groupKeys;
}
}