Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;

import static com.facebook.presto.SystemSessionProperties.isCheckAccessControlOnUtilizedColumnsOnly;
Expand Down Expand Up @@ -112,6 +113,7 @@ public class Analysis
private final Map<NodeRef<QuerySpecification>, List<FunctionCall>> windowFunctions = new LinkedHashMap<>();
private final Map<NodeRef<OrderBy>, List<FunctionCall>> orderByWindowFunctions = new LinkedHashMap<>();
private final Map<NodeRef<Offset>, Long> offset = new LinkedHashMap<>();
private final Map<NodeRef<Node>, OptionalLong> limit = new LinkedHashMap<>();

private final Map<NodeRef<Join>, Expression> joins = new LinkedHashMap<>();
private final Map<NodeRef<Join>, JoinUsingAnalysis> joinUsing = new LinkedHashMap<>();
Expand Down Expand Up @@ -349,6 +351,22 @@ public long getOffset(Offset node)
return offset.get(NodeRef.of(node));
}

public void setLimit(Node node, OptionalLong rowCount)
{
limit.put(NodeRef.of(node), rowCount);
}

public void setLimit(Node node, long rowCount)
{
limit.put(NodeRef.of(node), OptionalLong.of(rowCount));
}

public OptionalLong getLimit(Node node)
{
checkState(limit.containsKey(NodeRef.of(node)), "missing LIMIT value for node %s", node);
return limit.get(NodeRef.of(node));
}

public void setOutputExpressions(Node node, List<Expression> expressions)
{
outputExpressions.put(NodeRef.of(node), ImmutableList.copyOf(expressions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,7 @@ public enum SemanticErrorCode
TOO_MANY_GROUPING_SETS,

INVALID_OFFSET_ROW_COUNT,
INVALID_FETCH_FIRST_ROW_COUNT,
INVALID_LIMIT_ROW_COUNT,
MISSING_ORDER_BY,
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import com.facebook.presto.sql.tree.AllColumns;
import com.facebook.presto.sql.tree.AlterFunction;
import com.facebook.presto.sql.tree.Analyze;
import com.facebook.presto.sql.tree.AstVisitor;
import com.facebook.presto.sql.tree.Call;
import com.facebook.presto.sql.tree.Commit;
import com.facebook.presto.sql.tree.ComparisonExpression;
Expand Down Expand Up @@ -87,6 +88,7 @@
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.ExpressionRewriter;
import com.facebook.presto.sql.tree.ExpressionTreeRewriter;
import com.facebook.presto.sql.tree.FetchFirst;
import com.facebook.presto.sql.tree.FieldReference;
import com.facebook.presto.sql.tree.FrameBound;
import com.facebook.presto.sql.tree.FunctionCall;
Expand All @@ -104,6 +106,7 @@
import com.facebook.presto.sql.tree.JoinOn;
import com.facebook.presto.sql.tree.JoinUsing;
import com.facebook.presto.sql.tree.Lateral;
import com.facebook.presto.sql.tree.Limit;
import com.facebook.presto.sql.tree.Literal;
import com.facebook.presto.sql.tree.LogicalBinaryExpression;
import com.facebook.presto.sql.tree.LongLiteral;
Expand Down Expand Up @@ -169,6 +172,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -216,7 +220,9 @@
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.DUPLICATE_PARAMETER_NAME;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.DUPLICATE_PROPERTY;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.DUPLICATE_RELATION;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.INVALID_FETCH_FIRST_ROW_COUNT;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.INVALID_FUNCTION_NAME;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.INVALID_LIMIT_ROW_COUNT;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.INVALID_OFFSET_ROW_COUNT;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.INVALID_ORDINAL;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.INVALID_PROCEDURE_ARGUMENTS;
Expand All @@ -229,6 +235,7 @@
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_CATALOG;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_COLUMN;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_MATERIALIZED_VIEW;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_ORDER_BY;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_SCHEMA;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MISSING_TABLE;
import static com.facebook.presto.sql.analyzer.SemanticErrorCode.MUST_BE_WINDOW_FUNCTION;
Expand Down Expand Up @@ -1086,6 +1093,13 @@ protected Scope visitQuery(Query node, Optional<Scope> scope)
if (node.getOffset().isPresent()) {
analyzeOffset(node.getOffset().get());
}

if (node.getLimit().isPresent()) {
boolean requiresOrderBy = analyzeLimit(node.getLimit().get());
if (requiresOrderBy && !node.getOrderBy().isPresent()) {
throw new SemanticException(MISSING_ORDER_BY, node.getLimit().get(), "FETCH FIRST WITH TIES clause requires ORDER BY");
}
}
// Input fields == Output fields
analysis.setOutputExpressions(node, descriptorToFields(queryBodyScope));

Expand Down Expand Up @@ -1362,6 +1376,7 @@ private String getMaterializedViewSQL(
Map<SchemaTableName, Expression> partitionPredicates = generatePartitionPredicate(materializedViewStatus.getPartitionsFromBaseTables());

Query predicateStitchedQuery = (Query) new PredicateStitcher(session, partitionPredicates).process(createSqlStatement, new PredicateStitcherContext());

QuerySpecification materializedViewQuerySpecification = new QuerySpecification(
selectList(new AllColumns()),
((QuerySpecification) statement.getQueryBody()).getFrom(),
Expand Down Expand Up @@ -1535,6 +1550,13 @@ protected Scope visitQuerySpecification(QuerySpecification node, Optional<Scope>
}
analysis.setOrderByExpressions(node, orderByExpressions);

if (node.getLimit().isPresent()) {
boolean requiresOrderBy = analyzeLimit(node.getLimit().get());
if (requiresOrderBy && !node.getOrderBy().isPresent()) {
throw new SemanticException(MISSING_ORDER_BY, node.getLimit().get(), "FETCH FIRST WITH TIES clause requires ORDER BY");
}
}

List<Expression> sourceExpressions = new ArrayList<>(outputExpressions);
node.getHaving().ifPresent(sourceExpressions::add);

Expand Down Expand Up @@ -2649,6 +2671,79 @@ private void analyzeOffset(Offset node)
analysis.setOffset(node, rowCount);
}

private List<Expression> analyzeOrderBy(QuerySpecification node, Scope orderByScope, List<Expression> outputExpressions)
{
checkState(node.getOrderBy().isPresent(), "orderBy is absent");

List<SortItem> sortItems = getSortItemsFromOrderBy(node.getOrderBy());

if (node.getSelect().isDistinct()) {
verifySelectDistinct(node, outputExpressions);
}

return analyzeOrderBy(node, sortItems, orderByScope);
}

/**
* @return true if the Query / QuerySpecification containing the analyzed
* Limit or FetchFirst, must contain orderBy (i.e., for FetchFirst with ties).
*/
private boolean analyzeLimit(Node node)
{
checkState(
node instanceof FetchFirst || node instanceof Limit,
"Invalid limit node type. Expected: FetchFirst or Limit. Actual: %s", node.getClass().getName());

return new AstVisitor<Boolean, Void>()
{
@Override
protected Boolean visitFetchFirst(FetchFirst node, Void context)
{
if (!node.getRowCount().isPresent()) {
analysis.setLimit(node, 1);
}
else {
long rowCount;
try {
rowCount = Long.parseLong(node.getRowCount().get());
}
catch (NumberFormatException e) {
throw new SemanticException(INVALID_FETCH_FIRST_ROW_COUNT, node, "Invalid FETCH FIRST row count: %s", node.getRowCount().get());
}
if (rowCount <= 0) {
throw new SemanticException(INVALID_FETCH_FIRST_ROW_COUNT, node, "FETCH FIRST row count must be positive (actual value: %s)", rowCount);
}
analysis.setLimit(node, rowCount);
}

if (node.isWithTies()) {
return true;
}

return false;
}

@Override
protected Boolean visitLimit(Limit node, Void context)
{
if (node.getLimit().equalsIgnoreCase("all")) {
analysis.setLimit(node, OptionalLong.empty());
}
else {
long rowCount;
try {
rowCount = Long.parseLong(node.getLimit());
}
catch (NumberFormatException e) {
throw new SemanticException(INVALID_LIMIT_ROW_COUNT, node, "Invalid LIMIT row count: %s", node.getLimit());
}
analysis.setLimit(node, rowCount);
}
return false;
}
}.process(node, null);
}

private void verifySelectDistinct(QuerySpecification node, List<Expression> outputExpressions)
{
for (SortItem item : node.getOrderBy().get().getSortItems()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.facebook.presto.sql.planner.iterative.rule.GatherAndMergeWindows;
import com.facebook.presto.sql.planner.iterative.rule.ImplementBernoulliSampleAsFilter;
import com.facebook.presto.sql.planner.iterative.rule.ImplementFilteredAggregations;
import com.facebook.presto.sql.planner.iterative.rule.ImplementLimitWithTies;
import com.facebook.presto.sql.planner.iterative.rule.ImplementOffset;
import com.facebook.presto.sql.planner.iterative.rule.InlineProjections;
import com.facebook.presto.sql.planner.iterative.rule.InlineSqlFunctions;
Expand Down Expand Up @@ -334,7 +335,8 @@ public PlanOptimizers(
estimatedExchangesCostCalculator,
ImmutableSet.of(
new ImplementBernoulliSampleAsFilter(),
new ImplementOffset())),
new ImplementOffset(),
new ImplementLimitWithTies(metadata))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this to be controlled by a session param/feature config. In fact, I would like the whole feature to be optional.

simplifyOptimizer,
new UnaliasSymbolReferences(metadata.getFunctionAndTypeManager()),
new IterativeOptimizer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.facebook.presto.sql.tree.Cast;
import com.facebook.presto.sql.tree.Delete;
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.FetchFirst;
import com.facebook.presto.sql.tree.FieldReference;
import com.facebook.presto.sql.tree.FrameBound;
import com.facebook.presto.sql.tree.FunctionCall;
Expand Down Expand Up @@ -146,9 +147,10 @@ public RelationPlan plan(Query query)
List<Expression> outputs = analysis.getOutputExpressions(query);
builder = handleSubqueries(builder, query, outputs);
builder = project(builder, Iterables.concat(orderBy, outputs));
builder = sort(builder, query);
Optional<OrderingScheme> orderingScheme = orderingScheme(builder, query.getOrderBy(), analysis.getOrderByExpressions(query));
builder = sort(builder, orderingScheme);
builder = offset(builder, query.getOffset());
builder = limit(builder, query);
builder = limit(builder, query.getLimit(), orderingScheme);
builder = project(builder, analysis.getOutputExpressions(query));

return new RelationPlan(builder.getRoot(), analysis.getScope(query), computeOutputs(builder, analysis.getOutputExpressions(query)));
Expand Down Expand Up @@ -194,9 +196,10 @@ public RelationPlan plan(QuerySpecification node)
builder = project(builder, Iterables.concat(orderBy, outputs));

builder = distinct(builder, node);
builder = sort(builder, node);
Optional<OrderingScheme> orderingScheme = orderingScheme(builder, node.getOrderBy(), analysis.getOrderByExpressions(node));
builder = sort(builder, orderingScheme);
builder = offset(builder, node.getOffset());
builder = limit(builder, node);
builder = limit(builder, node.getLimit(), orderingScheme);
builder = project(builder, outputs);

return new RelationPlan(builder.getRoot(), analysis.getScope(node), computeOutputs(builder, outputs));
Expand Down Expand Up @@ -887,29 +890,17 @@ private PlanBuilder distinct(PlanBuilder subPlan, QuerySpecification node)
return subPlan;
}

private PlanBuilder sort(PlanBuilder subPlan, Query node)
{
return sort(subPlan, node.getOrderBy(), analysis.getOrderByExpressions(node));
}

private PlanBuilder sort(PlanBuilder subPlan, QuerySpecification node)
{
return sort(subPlan, node.getOrderBy(), analysis.getOrderByExpressions(node));
}

private PlanBuilder sort(PlanBuilder subPlan, Optional<OrderBy> orderBy, List<Expression> orderByExpressions)
private Optional<OrderingScheme> orderingScheme(PlanBuilder subPlan, Optional<OrderBy> orderBy, List<Expression> orderByExpressions)
{
if (!orderBy.isPresent() || (isSkipRedundantSort(session)) && analysis.isOrderByRedundant(orderBy.get())) {
return subPlan;
return Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change?

}

PlanNode planNode;
OrderingScheme orderingScheme = toOrderingScheme(
orderByExpressions.stream().map(subPlan::translate).collect(toImmutableList()),
orderBy.get().getSortItems().stream().map(PlannerUtils::toSortOrder).collect(toImmutableList()));
planNode = new SortNode(idAllocator.getNextId(), subPlan.getRoot(), orderingScheme, false);

return subPlan.withNewRoot(planNode);
return Optional.of(orderingScheme);
}

private PlanBuilder offset(PlanBuilder subPlan, Optional<Offset> offset)
Expand All @@ -925,25 +916,33 @@ private PlanBuilder offset(PlanBuilder subPlan, Optional<Offset> offset)
analysis.getOffset(offset.get())));
}

private PlanBuilder limit(PlanBuilder subPlan, Query node)
{
return limit(subPlan, node.getLimit());
}

private PlanBuilder limit(PlanBuilder subPlan, QuerySpecification node)
{
return limit(subPlan, node.getLimit());
}

private PlanBuilder limit(PlanBuilder subPlan, Optional<String> limit)
private PlanBuilder sort(PlanBuilder subPlan, Optional<OrderingScheme> orderingScheme)
{
if (!limit.isPresent()) {
if (!orderingScheme.isPresent()) {
return subPlan;
}
return subPlan.withNewRoot(
new SortNode(
idAllocator.getNextId(),
subPlan.getRoot(),
orderingScheme.get(),
false));
}

if (!limit.get().equalsIgnoreCase("all")) {
long limitValue = Long.parseLong(limit.get());
subPlan = subPlan.withNewRoot(new LimitNode(idAllocator.getNextId(), subPlan.getRoot(), limitValue, FINAL));
private PlanBuilder limit(PlanBuilder subPlan, Optional<Node> limit, Optional<OrderingScheme> orderingScheme)
{
if (limit.isPresent() && analysis.getLimit(limit.get()).isPresent()) {
Optional<OrderingScheme> tiesResolvingScheme = Optional.empty();
if (limit.get() instanceof FetchFirst && ((FetchFirst) limit.get()).isWithTies()) {
tiesResolvingScheme = orderingScheme;
}
return subPlan.withNewRoot(
new LimitNode(
idAllocator.getNextId(),
subPlan.getRoot(),
analysis.getLimit(limit.get()).getAsLong(),
tiesResolvingScheme,
FINAL));
}

return subPlan;
Expand Down
Loading