Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
68db706
Remove OpenSearchPagedIndexScan and related classes.
Apr 28, 2023
ef93375
Some bug fixes
May 2, 2023
306a7fe
Updating tests
May 2, 2023
7d5b126
Updating PaginationWindowIT
May 2, 2023
dc2c471
Addressing Code Analysis warnings
May 2, 2023
91cea61
Do not serialize OpenSearchScrollRequest when needClean is true.
May 2, 2023
2ee810c
Fix checkstyle errors.
May 2, 2023
39ce902
Complete unit test coverage.
May 4, 2023
fabd4ee
Code improvements
May 4, 2023
6eb0498
Checkstyle fixes
May 4, 2023
bdfe563
Updating expected out in doctest
May 4, 2023
1f6b6f1
Refactoring
May 5, 2023
a8295e4
Checkstyle fixes
May 5, 2023
86429f8
Rename createContinuePaginatedPlan in QueryPlanFactory to create
May 5, 2023
4f5b69d
Address PR comments.
May 9, 2023
b1050dd
Address PR comments.
May 10, 2023
a16f2fa
WIP OpenSearchIndexScan refactor. Unit tests pass.
May 12, 2023
eff76b2
Refactor OpenSearchIndexScan and OpenSearchRequest.
May 16, 2023
8548d90
Refactor OpenSearchIndexScan and OpenSearchRequest.
May 16, 2023
f4770a8
WIP
May 17, 2023
e350662
Updating imports to reflect changes in opensearch core. (#1645)
May 23, 2023
413087a
Merge branch 'feature/pagination/integ' into feature/pagination/integ…
May 24, 2023
7e812e5
Integrating with main.
May 24, 2023
1daacbb
Merge branch 'feature/pagination/integ' into feature/pagination/integ…
May 24, 2023
f1e1342
Address refactoring comments WIP
May 25, 2023
73f18df
Restore error to explain requests containing only a cursor.
May 25, 2023
60226be
Complete test coverage.
May 26, 2023
fabb179
Address checkstyle issues.
May 26, 2023
23bc3ab
Better class name.
May 26, 2023
264e483
Update design document to reflect refactor.
May 26, 2023
97ef3a5
Addressed PR feedback.
May 26, 2023
2cecaca
Addressed PR feedback.
May 26, 2023
840c4a9
Minor cleanup.
May 26, 2023
4c9e958
Update core/src/main/java/org/opensearch/sql/planner/logical/LogicalP…
May 26, 2023
102706b
Update core/src/main/java/org/opensearch/sql/planner/logical/LogicalP…
May 26, 2023
2f4b48b
Update core/src/main/java/org/opensearch/sql/ast/tree/FetchCursor.java
May 26, 2023
33ad6dd
Minor cleanup 2
May 27, 2023
0abe298
Remove assertions that no longer apply
May 29, 2023
1dc3320
Minor cleanup
May 29, 2023
701bce7
Update test to account for prior changes
May 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void onFailure(Exception e) {
/**
* Create QueryExecution from Statement.
*/
public AbstractPlan createContinuePaginatedPlan(
public AbstractPlan create(
Statement statement,
Optional<ResponseListener<ExecutionEngine.QueryResponse>> queryListener,
Optional<ResponseListener<ExecutionEngine.ExplainResponse>> explainListener) {
Expand All @@ -75,9 +75,9 @@ public AbstractPlan createContinuePaginatedPlan(
/**
* Creates a ContinuePaginatedPlan from a cursor.
*/
public AbstractPlan createContinuePaginatedPlan(String cursor, boolean isExplain,
ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener,
ResponseListener<ExecutionEngine.ExplainResponse> explainListener) {
public AbstractPlan create(String cursor, boolean isExplain,
ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener,
ResponseListener<ExecutionEngine.ExplainResponse> explainListener) {
QueryId queryId = QueryId.queryId();
var plan = new ContinuePaginatedPlan(queryId, cursor, queryService,
planSerializer, queryResponseListener);
Expand Down Expand Up @@ -119,7 +119,7 @@ public AbstractPlan visitExplain(

return new ExplainPlan(
QueryId.queryId(),
createContinuePaginatedPlan(node.getStatement(),
create(node.getStatement(),
Optional.of(NO_CONSUMER_RESPONSE_LISTENER), Optional.empty()),
context.getRight().get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.optimizer.rule.CreatePagingTableScanBuilder;
import org.opensearch.sql.planner.optimizer.rule.MergeFilterAndFilter;
import org.opensearch.sql.planner.optimizer.rule.PushFilterUnderSort;
import org.opensearch.sql.planner.optimizer.rule.read.CreateTableScanBuilder;
Expand Down Expand Up @@ -52,7 +51,7 @@ public static LogicalPlanOptimizer create() {
* Phase 2: Transformations that rely on data source push down capability
*/
new CreateTableScanBuilder(),
new CreatePagingTableScanBuilder(),
new PushDownPageSize(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why we aren't using TableScanPushDown.PUSH_DOWN_PAGESIZE? Is pagesize not a concept that the TableScan should know about?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PushDownPageSize does not really fit into the simple pattern of "when node of this type is encountered, call this method" that other PUSH_DOWN_* rules use.

It's more similar to CreateTableScanBuilder rule.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it similar to PUSH_DOWN_LIMIT? and should be at the end of rule list?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should limit be at the end of the list? I did move PushDownPageSize to be after limit.

TableScanPushDown.PUSH_DOWN_FILTER,
TableScanPushDown.PUSH_DOWN_AGGREGATION,
TableScanPushDown.PUSH_DOWN_SORT,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.optimizer;

import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Optional;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.storage.read.TableScanBuilder;

public class PushDownPageSize implements Rule<LogicalPaginate> {
@Override
public Pattern<LogicalPaginate> pattern() {
return Pattern.typeOf(LogicalPaginate.class)
.matching(lp -> findTableScanBuilder(lp).isPresent());
}

@Override
public LogicalPlan apply(LogicalPaginate plan, Captures captures) {

var builder = findTableScanBuilder(plan).orElseThrow();
if (!builder.pushDownPageSize(plan)) {
throw new IllegalStateException("Failed to push down LogicalPaginate");
}
return plan.getChild().get(0);
}

private Optional<TableScanBuilder> findTableScanBuilder(LogicalPaginate logicalPaginate) {
Deque<LogicalPlan> plans = new ArrayDeque<>();
plans.add(logicalPaginate);
do {
final var plan = plans.removeFirst();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final var? should be val?

final var children = plan.getChild();
if (children.stream().anyMatch(TableScanBuilder.class::isInstance)) {
if (children.size() > 1) {
throw new UnsupportedOperationException(
"Unsupported plan: relation operator cannot have siblings");
}
return Optional.of((TableScanBuilder) children.get(0));
}
plans.addAll(children);
} while (!plans.isEmpty());
return Optional.empty();
}
}

This file was deleted.

4 changes: 0 additions & 4 deletions core/src/main/java/org/opensearch/sql/storage/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,4 @@ default StreamingSource asStreamingSource() {
throw new UnsupportedOperationException();
}

default TableScanBuilder createPagedScanBuilder(int pageSize) {
var error = String.format("'%s' does not support pagination", getClass().toString());
throw new UnsupportedOperationException(error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.sql.planner.logical.LogicalHighlight;
import org.opensearch.sql.planner.logical.LogicalLimit;
import org.opensearch.sql.planner.logical.LogicalNested;
import org.opensearch.sql.planner.logical.LogicalPaginate;
import org.opensearch.sql.planner.logical.LogicalPlan;
import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor;
import org.opensearch.sql.planner.logical.LogicalProject;
Expand All @@ -28,7 +29,7 @@ public abstract class TableScanBuilder extends LogicalPlan {
/**
* Construct and initialize children to empty list.
*/
public TableScanBuilder() {
protected TableScanBuilder() {
super(Collections.emptyList());
}

Expand Down Expand Up @@ -116,6 +117,10 @@ public boolean pushDownNested(LogicalNested nested) {
return false;
}

public boolean pushDownPageSize(LogicalPaginate paginate) {
return false;
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitTableScanBuilder(this, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,23 +62,23 @@ void init() {
public void createFromQueryShouldSuccess() {
Statement query = new Query(plan, 0);
AbstractPlan queryExecution =
factory.createContinuePaginatedPlan(query, Optional.of(queryListener), Optional.empty());
factory.create(query, Optional.of(queryListener), Optional.empty());
assertTrue(queryExecution instanceof QueryPlan);
}

@Test
public void createFromExplainShouldSuccess() {
Statement query = new Explain(new Query(plan, 0));
AbstractPlan queryExecution =
factory.createContinuePaginatedPlan(query, Optional.empty(), Optional.of(explainListener));
factory.create(query, Optional.empty(), Optional.of(explainListener));
assertTrue(queryExecution instanceof ExplainPlan);
}

@Test
public void createFromCursorShouldSuccess() {
AbstractPlan queryExecution = factory.createContinuePaginatedPlan("", false,
AbstractPlan queryExecution = factory.create("", false,
queryListener, explainListener);
AbstractPlan explainExecution = factory.createContinuePaginatedPlan("", true,
AbstractPlan explainExecution = factory.create("", true,
queryListener, explainListener);
assertAll(
() -> assertTrue(queryExecution instanceof ContinuePaginatedPlan),
Expand All @@ -91,7 +91,7 @@ public void createFromQueryWithoutQueryListenerShouldThrowException() {
Statement query = new Query(plan, 0);

IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> factory.createContinuePaginatedPlan(
assertThrows(IllegalArgumentException.class, () -> factory.create(
query, Optional.empty(), Optional.empty()));
assertEquals("[BUG] query listener must be not null", exception.getMessage());
}
Expand All @@ -101,7 +101,7 @@ public void createFromExplainWithoutExplainListenerShouldThrowException() {
Statement query = new Explain(new Query(plan, 0));

IllegalArgumentException exception =
assertThrows(IllegalArgumentException.class, () -> factory.createContinuePaginatedPlan(
assertThrows(IllegalArgumentException.class, () -> factory.create(
query, Optional.empty(), Optional.empty()));
assertEquals("[BUG] explain listener must be not null", exception.getMessage());
}
Expand Down Expand Up @@ -129,7 +129,7 @@ public void createQueryWithFetchSizeWhichCanBePaged() {
factory = new QueryPlanFactory(queryService, planSerializer);
Statement query = new Query(plan, 10);
AbstractPlan queryExecution =
factory.createContinuePaginatedPlan(query, Optional.of(queryListener), Optional.empty());
factory.create(query, Optional.of(queryListener), Optional.empty());
assertTrue(queryExecution instanceof QueryPlan);
}

Expand All @@ -139,7 +139,7 @@ public void createQueryWithFetchSizeWhichCannotBePaged() {
factory = new QueryPlanFactory(queryService, planSerializer);
Statement query = new Query(plan, 10);
assertThrows(UnsupportedCursorRequestException.class,
() -> factory.createContinuePaginatedPlan(query,
() -> factory.create(query,
Optional.of(queryListener), Optional.empty()));
}
}
Loading