Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
68db706
Remove OpenSearchPagedIndexScan and related classes.
Apr 28, 2023
ef93375
Some bug fixes
May 2, 2023
306a7fe
Updating tests
May 2, 2023
7d5b126
Updating PaginationWindowIT
May 2, 2023
dc2c471
Addressing Code Analysis warnings
May 2, 2023
91cea61
Do not serialize OpenSearchScrollRequest when needClean is true.
May 2, 2023
2ee810c
Fix checkstyle errors.
May 2, 2023
39ce902
Complete unit test coverage.
May 4, 2023
fabd4ee
Code improvements
May 4, 2023
6eb0498
Checkstyle fixes
May 4, 2023
bdfe563
Updating expected out in doctest
May 4, 2023
1f6b6f1
Refactoring
May 5, 2023
a8295e4
Checkstyle fixes
May 5, 2023
86429f8
Rename createContinuePaginatedPlan in QueryPlanFactory to create
May 5, 2023
4f5b69d
Address PR comments.
May 9, 2023
b1050dd
Address PR comments.
May 10, 2023
a16f2fa
WIP OpenSearchIndexScan refactor. Unit tests pass.
May 12, 2023
eff76b2
Refactor OpenSearchIndexScan and OpenSearchRequest.
May 16, 2023
8548d90
Refactor OpenSearchIndexScan and OpenSearchRequest.
May 16, 2023
f4770a8
WIP
May 17, 2023
e350662
Updating imports to reflect changes in opensearch core. (#1645)
May 23, 2023
413087a
Merge branch 'feature/pagination/integ' into feature/pagination/integ…
May 24, 2023
7e812e5
Integrating with main.
May 24, 2023
1daacbb
Merge branch 'feature/pagination/integ' into feature/pagination/integ…
May 24, 2023
f1e1342
Address refactoring comments WIP
May 25, 2023
73f18df
Restore error to explain requests containing only a cursor.
May 25, 2023
60226be
Complete test coverage.
May 26, 2023
fabb179
Address checkstyle issues.
May 26, 2023
23bc3ab
Better class name.
May 26, 2023
264e483
Update design document to reflect refactor.
May 26, 2023
97ef3a5
Addressed PR feedback.
May 26, 2023
2cecaca
Addressed PR feedback.
May 26, 2023
840c4a9
Minor cleanup.
May 26, 2023
4c9e958
Update core/src/main/java/org/opensearch/sql/planner/logical/LogicalP…
May 26, 2023
102706b
Update core/src/main/java/org/opensearch/sql/planner/logical/LogicalP…
May 26, 2023
2f4b48b
Update core/src/main/java/org/opensearch/sql/ast/tree/FetchCursor.java
May 26, 2023
33ad6dd
Minor cleanup 2
May 27, 2023
0abe298
Remove assertions that no longer apply
May 29, 2023
1dc3320
Minor cleanup
May 29, 2023
701bce7
Update test to account for prior changes
May 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Cursor;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
Expand All @@ -64,7 +65,6 @@
import org.opensearch.sql.common.antlr.SyntaxCheckException;
import org.opensearch.sql.data.model.ExprMissingValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.DSL;
Expand All @@ -80,6 +80,7 @@
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.TableFunctionImplementation;
import org.opensearch.sql.expression.parse.ParseExpression;
import org.opensearch.sql.planner.LogicalCursor;
import org.opensearch.sql.planner.logical.LogicalAD;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalDedupe;
Expand Down Expand Up @@ -211,6 +212,10 @@ public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext contex
tableFunctionImplementation.applyArguments());
}

@Override
public LogicalPlan visitCursor(Cursor cursor, AnalysisContext context) {
return new LogicalCursor(cursor.getCursor());
}

@Override
public LogicalPlan visitLimit(Limit node, AnalysisContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Cursor;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.Filter;
Expand Down Expand Up @@ -299,4 +300,8 @@ public T visitExplain(Explain node, C context) {
public T visitPaginate(Paginate paginate, C context) {
return visitChildren(paginate, context);
}

public T visitCursor(Cursor cursor, C context) {
return visit(cursor, context);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
return visit(cursor, context);
return visitChildren(cursor, context);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lave for later

}
}
21 changes: 21 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/Cursor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.opensearch.sql.ast.tree;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.ast.AbstractNodeVisitor;

@RequiredArgsConstructor
public class Cursor extends UnresolvedPlan {
@Getter
final String cursor;

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitCursor(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
throw new UnsupportedOperationException("Cursor unresolved plan does not support children");
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
import org.opensearch.sql.ast.statement.Explain;
import org.opensearch.sql.ast.statement.Query;
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.Cursor;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.exception.UnsupportedCursorRequestException;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;
import org.opensearch.sql.executor.pagination.CanPaginateVisitor;
import org.opensearch.sql.executor.pagination.PlanSerializer;

/**
Expand Down Expand Up @@ -65,7 +68,7 @@ public void onFailure(Exception e) {
/**
* Create QueryExecution from Statement.
*/
public AbstractPlan createContinuePaginatedPlan(
public AbstractPlan create(
Statement statement,
Optional<ResponseListener<ExecutionEngine.QueryResponse>> queryListener,
Optional<ResponseListener<ExecutionEngine.ExplainResponse>> explainListener) {
Expand All @@ -75,15 +78,18 @@ public AbstractPlan createContinuePaginatedPlan(
/**
* Creates a ContinuePaginatedPlan from a cursor.
*/
public AbstractPlan createContinuePaginatedPlan(String cursor, boolean isExplain,
ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener,
ResponseListener<ExecutionEngine.ExplainResponse> explainListener) {
public AbstractPlan create(String cursor, boolean isExplain,
ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener,
ResponseListener<ExecutionEngine.ExplainResponse> explainListener) {
QueryId queryId = QueryId.queryId();
var plan = new ContinuePaginatedPlan(queryId, cursor, queryService,
planSerializer, queryResponseListener);
var plan = new QueryPlan(queryId, new Cursor(cursor), queryService, queryResponseListener);
return isExplain ? new ExplainPlan(queryId, plan, explainListener) : plan;
}

boolean canConvertToCursor(UnresolvedPlan plan) {
return plan.accept(new CanPaginateVisitor(), null);
}

@Override
public AbstractPlan visitQuery(
Query node,
Expand All @@ -94,7 +100,7 @@ public AbstractPlan visitQuery(
context.getLeft().isPresent(), "[BUG] query listener must be not null");

if (node.getFetchSize() > 0) {
if (planSerializer.canConvertToCursor(node.getPlan())) {
if (canConvertToCursor(node.getPlan())) {
return new QueryPlan(QueryId.queryId(), node.getPlan(), node.getFetchSize(),
queryService,
context.getLeft().get());
Expand All @@ -119,7 +125,7 @@ public AbstractPlan visitExplain(

return new ExplainPlan(
QueryId.queryId(),
createContinuePaginatedPlan(node.getStatement(),
create(node.getStatement(),
Optional.of(NO_CONSUMER_RESPONSE_LISTENER), Optional.empty()),
context.getRight().get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.exception.NoCursorException;
import org.opensearch.sql.planner.SerializablePlan;
import org.opensearch.sql.planner.ExternalizablePlan;
import org.opensearch.sql.planner.physical.PhysicalPlan;
import org.opensearch.sql.storage.StorageEngine;

Expand All @@ -34,17 +33,14 @@ public class PlanSerializer {

private final StorageEngine engine;

public boolean canConvertToCursor(UnresolvedPlan plan) {
return plan.accept(new CanPaginateVisitor(), null);
}

/**
* Converts a physical plan tree to a cursor.
*/
public Cursor convertToCursor(PhysicalPlan plan) {
try {
return new Cursor(CURSOR_PREFIX
+ serialize(((SerializablePlan) plan).getPlanForSerialization()));
+ serialize(((ExternalizablePlan) plan).getPlanForSerialization()));
// ClassCastException thrown when a plan in the tree doesn't implement SerializablePlan
} catch (NotSerializableException | ClassCastException | NoCursorException e) {
return Cursor.None;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
* </li>
* <li>
* Overwrite {@link #getPlanForSerialization} to return
* another instance of {@link SerializablePlan}.
* another instance of {@link ExternalizablePlan}.
* </li>
* </ul>
*/
public interface SerializablePlan extends Externalizable {
public interface ExternalizablePlan extends Externalizable {

/**
* Argument is an instance of {@link PlanSerializer.CursorDeserializationStream}.
Expand Down Expand Up @@ -57,7 +57,7 @@ public interface SerializablePlan extends Externalizable {
* It is needed to skip a `ResourceMonitorPlan` instance only, actually.
* @return Next plan for serialization.
*/
default SerializablePlan getPlanForSerialization() {
default ExternalizablePlan getPlanForSerialization() {
return this;
}
}
21 changes: 21 additions & 0 deletions core/src/main/java/org/opensearch/sql/planner/LogicalCursor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.opensearch.sql.planner;

import java.util.List;
import lombok.Getter;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor;

public class LogicalCursor extends LogicalPlan {
@Getter
final String cursor;

public LogicalCursor(String cursor) {
super(List.of());
this.cursor = cursor;
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitCursor(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package org.opensearch.sql.planner.logical;

import org.opensearch.sql.planner.LogicalCursor;
import org.opensearch.sql.storage.read.TableScanBuilder;
import org.opensearch.sql.storage.write.TableWriteBuilder;

Expand Down Expand Up @@ -108,4 +109,8 @@ public R visitAD(LogicalAD plan, C context) {
public R visitPaginate(LogicalPaginate plan, C context) {
return visitNode(plan, context);
}

public R visitCursor(LogicalCursor plan, C context) {
return visitNode(plan, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.rule.CreatePagingTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter;
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
Expand Down Expand Up @@ -52,7 +51,7 @@ public static LogicalPlanOptimizer create() {
* Phase 2: Transformations that rely on data source push down capability
*/
new CreateTableScanBuilder(),
new CreatePagingTableScanBuilder(),
new PushDownPageSize(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why we aren't using TableScanPushDown.PUSH_DOWN_PAGESIZE? Is pagesize not a concept that the TableScan should know about?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PushDownPageSize does not really fit into the simple pattern of "when node of this type is encountered, call this method" that other PUSH_DOWN_* rules use.

It's more similar to CreateTableScanBuilder rule.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it similar to PUSH_DOWN_LIMIT? and should be at the end of rule list?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should limit be at the end of the list? I did move PushDownPageSize to be after limit.

TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
TableScanPushDown.PUSH_DOWN_SORT,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.optimizer;

import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Optional;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.storage.read.TableScanBuilder;

/**
* A {@link LogicalPlanOptimizer} rule that pushes down page size
* to table scan builder.
*/
public class PushDownPageSize implements Rule<LogicalPaginate> {
@Override
public Pattern<LogicalPaginate> pattern() {
return Pattern.typeOf(LogicalPaginate.class)
.matching(lp -> findTableScanBuilder(lp).isPresent());
}

@Override
public LogicalPlan apply(LogicalPaginate plan, Captures captures) {

var builder = findTableScanBuilder(plan).orElseThrow();
if (!builder.pushDownPageSize(plan)) {
throw new IllegalStateException("Failed to push down LogicalPaginate");
}
return plan.getChild().get(0);
}

private Optional<TableScanBuilder> findTableScanBuilder(LogicalPaginate logicalPaginate) {
Deque<LogicalPlan> plans = new ArrayDeque<>();
plans.add(logicalPaginate);
do {
final var plan = plans.removeFirst();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final var? should be val?

final var children = plan.getChild();
if (children.stream().anyMatch(TableScanBuilder.class::isInstance)) {
if (children.size() > 1) {
throw new UnsupportedOperationException(
"Unsupported plan: relation operator cannot have siblings");
}
return Optional.of((TableScanBuilder) children.get(0));
}
plans.addAll(children);
} while (!plans.isEmpty());
return Optional.empty();
}
}
Loading