Skip to content

Commit

Permalink
Project operator pushdown (opendistro-for-elasticsearch#933)
Browse files Browse the repository at this point in the history
* init

* update

* update

* update doc
  • Loading branch information
penghuo committed Dec 16, 2020
1 parent fb98ea5 commit cad0776
Show file tree
Hide file tree
Showing 17 changed files with 476 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public T visitLiteral(LiteralExpression node, C context) {
}

public T visitNamed(NamedExpression node, C context) {
return visitNode(node, context);
return node.getDelegated().accept(this, context);
}

public T visitReference(ReferenceExpression node, C context) {
Expand Down
35 changes: 30 additions & 5 deletions docs/user/optimization/optimization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ The consecutive Filter operator will be merged as one Filter operator::
{
"name": "ElasticsearchIndexScan",
"description": {
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)"
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"bool\":{\"filter\":[{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},{\"range\":{\"age\":{\"from\":10,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}}],\"adjust_pure_negative\":true,\"boost\":1.0}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)"
},
"children": []
}
Expand All @@ -71,7 +71,7 @@ The Filter operator should be push down under Sort operator::
{
"name": "ElasticsearchIndexScan",
"description": {
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)"
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":null,\"to\":20,\"include_lower\":true,\"include_upper\":false,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)"
},
"children": []
}
Expand All @@ -85,6 +85,31 @@ Elasticsearch Specific Optimization

The Elasticsearch `Query DSL <https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html>`_ and `Aggregation <https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html>`_ also enabling the storage engine specific optimization.

Push Project Into Query DSL
---------------------------
The Project list will push down to Query DSL to `filter the source <https://www.elastic.co/guide/en/elasticsearch/reference/7.x/search-fields.html#source-filtering>`_::

sh$ curl -sS -H 'Content-Type: application/json' \
... -X POST localhost:9200/_opendistro/_sql/_explain \
... -d '{"query" : "SELECT age FROM accounts"}'
{
"root": {
"name": "ProjectOperator",
"description": {
"fields": "[age]"
},
"children": [
{
"name": "ElasticsearchIndexScan",
"description": {
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, searchDone=false)"
},
"children": []
}
]
}
}

Filter Merge Into Query DSL
---------------------------

Expand All @@ -103,7 +128,7 @@ The Filter operator will merge into Elasticsearch Query DSL::
{
"name": "ElasticsearchIndexScan",
"description": {
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)"
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"query\":{\"range\":{\"age\":{\"from\":30,\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, searchDone=false)"
},
"children": []
}
Expand All @@ -129,7 +154,7 @@ The Sort operator will merge into Elasticsearch Query DSL::
{
"name": "ElasticsearchIndexScan",
"description": {
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)"
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":0,\"size\":200,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]},\"sort\":[{\"age\":{\"order\":\"asc\",\"missing\":\"_first\"}}]}, searchDone=false)"
},
"children": []
}
Expand Down Expand Up @@ -191,7 +216,7 @@ The Limit operator will merge in Elasticsearch Query DSL::
{
"name": "ElasticsearchIndexScan",
"description": {
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":5,\"size\":10,\"timeout\":\"1m\"}, searchDone=false)"
"request": "ElasticsearchQueryRequest(indexName=accounts, sourceBuilder={\"from\":5,\"size\":10,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"age\"],\"excludes\":[]}}, searchDone=false)"
},
"children": []
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import com.amazon.opendistroforelasticsearch.sql.ast.tree.Sort;
import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import com.amazon.opendistroforelasticsearch.sql.expression.NamedExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlanNodeVisitor;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Set;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -54,7 +56,7 @@ public class ElasticsearchLogicalIndexScan extends LogicalPlan {
* Projection List.
*/
@Setter
private List<NamedExpression> projectList;
private Set<ReferenceExpression> projectList;

/**
* Sort List.
Expand All @@ -75,7 +77,7 @@ public class ElasticsearchLogicalIndexScan extends LogicalPlan {
public ElasticsearchLogicalIndexScan(
String relationName,
Expression filter,
List<NamedExpression> projectList,
Set<ReferenceExpression> projectList,
List<Pair<Sort.SortOption, Expression>> sortList,
Integer limit, Integer offset) {
super(ImmutableList.of());
Expand All @@ -95,4 +97,13 @@ public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
public boolean hasLimit() {
return limit != null;
}

/**
* Test has projects or not.
*
* @return true for has projects, otherwise false.
*/
public boolean hasProjects() {
return projectList != null && !projectList.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.MergeSortAndIndexAgg;
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.MergeSortAndIndexScan;
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.MergeSortAndRelation;
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.PushProjectAndIndexScan;
import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.PushProjectAndRelation;
import com.amazon.opendistroforelasticsearch.sql.planner.optimizer.LogicalPlanOptimizer;
import java.util.Arrays;
import lombok.experimental.UtilityClass;
Expand All @@ -48,7 +50,9 @@ public static LogicalPlanOptimizer create() {
new MergeSortAndIndexAgg(),
new MergeSortAndIndexScan(),
new MergeLimitAndRelation(),
new
MergeLimitAndIndexScan()));
new MergeLimitAndIndexScan(),
new PushProjectAndRelation(),
new PushProjectAndIndexScan()
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@
package com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule;

import com.amazon.opendistroforelasticsearch.sql.ast.tree.Sort;
import com.amazon.opendistroforelasticsearch.sql.expression.ExpressionNodeVisitor;
import com.amazon.opendistroforelasticsearch.sql.expression.NamedExpression;
import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalSort;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import lombok.experimental.UtilityClass;

@UtilityClass
Expand Down Expand Up @@ -50,4 +56,37 @@ public static boolean sortByDefaultOptionOnly(LogicalSort logicalSort) {
|| Sort.SortOption.DEFAULT_DESC.equals(sort.getLeft()))
.reduce(true, Boolean::logicalAnd);
}

/**
* Find reference expression from expression.
* @param expressions a list of expression.
*
* @return a list of ReferenceExpression
*/
public static Set<ReferenceExpression> findReferenceExpressions(
List<NamedExpression> expressions) {
Set<ReferenceExpression> projectList = new HashSet<>();
for (NamedExpression namedExpression : expressions) {
projectList.addAll(findReferenceExpression(namedExpression));
}
return projectList;
}

/**
* Find reference expression from expression.
* @param expression expression.
*
* @return a list of ReferenceExpression
*/
public static List<ReferenceExpression> findReferenceExpression(
NamedExpression expression) {
List<ReferenceExpression> results = new ArrayList<>();
expression.accept(new ExpressionNodeVisitor<Object, Object>() {
@Override
public Object visitReference(ReferenceExpression node, Object context) {
return results.add(node);
}
}, null);
return results;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule;

import static com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.OptimizationRuleUtils.findReferenceExpressions;
import static com.amazon.opendistroforelasticsearch.sql.planner.optimizer.pattern.Patterns.source;
import static com.facebook.presto.matching.Pattern.typeOf;

import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.ElasticsearchLogicalIndexScan;
import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalProject;
import com.amazon.opendistroforelasticsearch.sql.planner.optimizer.Rule;
import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import java.util.Set;

/**
* Push Project list into ElasticsearchLogicalIndexScan.
*/
public class PushProjectAndIndexScan implements Rule<LogicalProject> {

private final Capture<ElasticsearchLogicalIndexScan> indexScanCapture;

private final Pattern<LogicalProject> pattern;

private Set<ReferenceExpression> pushDownProjects;

/**
* Constructor of MergeProjectAndIndexScan.
*/
public PushProjectAndIndexScan() {
this.indexScanCapture = Capture.newCapture();
this.pattern = typeOf(LogicalProject.class).matching(
project -> {
pushDownProjects = findReferenceExpressions(project.getProjectList());
return !pushDownProjects.isEmpty();
}).with(source()
.matching(typeOf(ElasticsearchLogicalIndexScan.class)
.matching(indexScan -> !indexScan.hasProjects())
.capturedAs(indexScanCapture)));

}

@Override
public Pattern<LogicalProject> pattern() {
return pattern;
}

@Override
public LogicalPlan apply(LogicalProject project,
Captures captures) {
ElasticsearchLogicalIndexScan indexScan = captures.get(indexScanCapture);
indexScan.setProjectList(pushDownProjects);
return new LogicalProject(indexScan, project.getProjectList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule;

import static com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.rule.OptimizationRuleUtils.findReferenceExpressions;
import static com.amazon.opendistroforelasticsearch.sql.planner.optimizer.pattern.Patterns.source;
import static com.facebook.presto.matching.Pattern.typeOf;

import com.amazon.opendistroforelasticsearch.sql.elasticsearch.planner.logical.ElasticsearchLogicalIndexScan;
import com.amazon.opendistroforelasticsearch.sql.expression.ReferenceExpression;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalPlan;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalProject;
import com.amazon.opendistroforelasticsearch.sql.planner.logical.LogicalRelation;
import com.amazon.opendistroforelasticsearch.sql.planner.optimizer.Rule;
import com.facebook.presto.matching.Capture;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import java.util.Set;

/**
* Push Project list into Relation. The transformed plan is Project - IndexScan
*/
public class PushProjectAndRelation implements Rule<LogicalProject> {

private final Capture<LogicalRelation> relationCapture;

private final Pattern<LogicalProject> pattern;

private Set<ReferenceExpression> pushDownProjects;

/**
* Constructor of MergeProjectAndRelation.
*/
public PushProjectAndRelation() {
this.relationCapture = Capture.newCapture();
this.pattern = typeOf(LogicalProject.class)
.matching(project -> {
pushDownProjects = findReferenceExpressions(project.getProjectList());
return !pushDownProjects.isEmpty();
})
.with(source().matching(typeOf(LogicalRelation.class).capturedAs(relationCapture)));
}

@Override
public Pattern<LogicalProject> pattern() {
return pattern;
}

@Override
public LogicalPlan apply(LogicalProject project,
Captures captures) {
LogicalRelation relation = captures.get(relationCapture);
return new LogicalProject(
ElasticsearchLogicalIndexScan
.builder()
.relationName(relation.getRelationName())
.projectList(findReferenceExpressions(project.getProjectList()))
.build(),
project.getProjectList()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ public PhysicalPlan visitIndexScan(ElasticsearchLogicalIndexScan node,
if (node.getLimit() != null) {
context.pushDownLimit(node.getLimit(), node.getOffset());
}

if (node.hasProjects()) {
context.pushDownProjects(node.getProjectList());
}
return indexScan;
}

Expand Down
Loading

0 comments on commit cad0776

Please sign in to comment.