Skip to content

Commit 71af5a3

Browse files
Yury-FridlyandMaxKsyunzGabeFernandez310MaxKsyunz
authored
Support pagination in V2 engine, phase 1 (#226)
* Fixing integration tests broken during POC Signed-off-by: MaxKsyunz <[email protected]> * Comment to clarify an exception. Signed-off-by: MaxKsyunz <[email protected]> * Add support for paginated scroll request, first page. Implement PaginatedPlanCache.convertToPlan for second page to work. Signed-off-by: MaxKsyunz <[email protected]> * Progress on paginated scroll request, subsequent page. Signed-off-by: MaxKsyunz <[email protected]> * Move `ExpressionSerializer` from `opensearch` to `core`. Signed-off-by: Yury-Fridlyand <[email protected]> * Rename `Cursor` `asString` to `toString`. Signed-off-by: Yury-Fridlyand <[email protected]> * Disable scroll cleaning. Signed-off-by: Yury-Fridlyand <[email protected]> * Add full cursor serialization and deserialization. Signed-off-by: Yury-Fridlyand <[email protected]> * Misc fixes. Signed-off-by: Yury-Fridlyand <[email protected]> * Further work on pagination. * Added push down page size from `LogicalPaginate` to `LogicalRelation`. * Improved cursor encoding and decoding. * Added cursor compression. * Fixed issuing `SearchScrollRequest`. * Fixed returning last empty page. * Minor code grooming/commenting. Signed-off-by: Yury-Fridlyand <[email protected]> * Pagination fix for empty indices. Signed-off-by: Yury-Fridlyand <[email protected]> * Fix error reporting on wrong cursor. Signed-off-by: Yury-Fridlyand <[email protected]> * Minor comments and error reporting improvement. Signed-off-by: Yury-Fridlyand <[email protected]> * Add an end-to-end integration test. Signed-off-by: Yury-Fridlyand <[email protected]> * Add `explain` request handlers. Signed-off-by: Yury-Fridlyand <[email protected]> * Add IT for explain. Signed-off-by: Yury-Fridlyand <[email protected]> * Address issues flagged by checkstyle build step (#229) Signed-off-by: MaxKsyunz <[email protected]> * Pagination, phase 1: Add unit tests for `:core` module with coverage. (#230) * Add unit tests for `:core` module with coverage. Uncovered: `toCursor`, because it is will be changed soon. Signed-off-by: Yury-Fridlyand <[email protected]> * Pagination, phase 1: Add unit tests for SQL module with coverage. (#239) * Add unit tests for SQL module with coverage. Signed-off-by: Yury-Fridlyand <[email protected]> * Update sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java Signed-off-by: Yury-Fridlyand <[email protected]> Co-authored-by: GabeFernandez310 <[email protected]> --------- Signed-off-by: Yury-Fridlyand <[email protected]> Co-authored-by: GabeFernandez310 <[email protected]> * Pagination, phase 1: Add unit tests for `:opensearch` module with coverage. (#233) * Add UT for `:opensearch` module with full coverage, except `toCursor`. Signed-off-by: Yury-Fridlyand <[email protected]> * Fix checkstyle. Signed-off-by: Yury-Fridlyand <[email protected]> --------- Signed-off-by: Yury-Fridlyand <[email protected]> * Fix the merges. Signed-off-by: Yury-Fridlyand <[email protected]> * Fix explain. Signed-off-by: Yury-Fridlyand <[email protected]> * Fix scroll cleaning. Signed-off-by: Yury-Fridlyand <[email protected]> * Store `TotalHits` and use it to report `total` in response. Signed-off-by: Yury-Fridlyand <[email protected]> * Add missing UT for `:protocol` module. Signed-off-by: Yury-Fridlyand <[email protected]> * Fix PPL UTs damaged in f4ea4ad. Signed-off-by: Yury-Fridlyand <[email protected]> * Minor checkstyle fixes. Signed-off-by: Yury-Fridlyand <[email protected]> * Fallback to v1 engine for pagination (#245) * Pagination fallback integration tests. Signed-off-by: MaxKsyunz <[email protected]> * Add UT with coverage for `toCursor` serialization. Signed-off-by: Yury-Fridlyand <[email protected]> * Fix broken tests in `legacy`. Signed-off-by: Yury-Fridlyand <[email protected]> * Fix getting `total` from non-paged requests and from queries without `FROM` clause. Signed-off-by: Yury-Fridlyand <[email protected]> * Fix scroll cleaning. Signed-off-by: Yury-Fridlyand <[email protected]> * Fix cursor request processing. Signed-off-by: Yury-Fridlyand <[email protected]> * Update ITs. Signed-off-by: Yury-Fridlyand <[email protected]> * Fix (again) TotalHits feature. Signed-off-by: Yury-Fridlyand <[email protected]> * Fix typo in prometheus config. Signed-off-by: Yury-Fridlyand <[email protected]> * Recover commented logging. Signed-off-by: Yury-Fridlyand <[email protected]> * Move `test_pagination_blackbox` to a separate class and add logging. Signed-off-by: Yury-Fridlyand <[email protected]> * Address some PR feedbacks: rename some classes and revert unnecessary whitespace changed. Signed-off-by: Yury-Fridlyand <[email protected]> * Minor commenting. Signed-off-by: Yury-Fridlyand <[email protected]> * Address PR comments. * Add javadocs * Renames * Cleaning up some comments * Remove unused code * Speed up IT Signed-off-by: Yury-Fridlyand <[email protected]> * Minor missing changes. Signed-off-by: Yury-Fridlyand <[email protected]> * Integration tests for fetch_size, max_result_window, and query.size_limit (#248) Signed-off-by: MaxKsyunz <[email protected]> * Remove `PaginatedQueryService`, extend `QueryService` to hold two planners and use them. Signed-off-by: Yury-Fridlyand <[email protected]> * Move push down functions from request builders to a new interface. Signed-off-by: Yury-Fridlyand <[email protected]> * Some file moves. Signed-off-by: Yury-Fridlyand <[email protected]> * Minor clean-up according to PR review. Signed-off-by: Yury-Fridlyand <[email protected]> --------- Signed-off-by: MaxKsyunz <[email protected]> Signed-off-by: Yury-Fridlyand <[email protected]> Co-authored-by: MaxKsyunz <[email protected]> Co-authored-by: GabeFernandez310 <[email protected]> Co-authored-by: Max Ksyunz <[email protected]>
1 parent 259b001 commit 71af5a3

File tree

146 files changed

+4936
-649
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

146 files changed

+4936
-649
lines changed

core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ dependencies {
5656
testImplementation('org.junit.jupiter:junit-jupiter:5.6.2')
5757
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.1'
5858
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.12.4'
59+
testImplementation group: 'org.mockito', name: 'mockito-inline', version: '3.12.4'
5960
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '3.12.4'
6061
}
6162

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.opensearch.sql.ast.tree.Kmeans;
5151
import org.opensearch.sql.ast.tree.Limit;
5252
import org.opensearch.sql.ast.tree.ML;
53+
import org.opensearch.sql.ast.tree.Paginate;
5354
import org.opensearch.sql.ast.tree.Parse;
5455
import org.opensearch.sql.ast.tree.Project;
5556
import org.opensearch.sql.ast.tree.RareTopN;
@@ -85,6 +86,7 @@
8586
import org.opensearch.sql.planner.logical.LogicalLimit;
8687
import org.opensearch.sql.planner.logical.LogicalML;
8788
import org.opensearch.sql.planner.logical.LogicalMLCommons;
89+
import org.opensearch.sql.planner.logical.LogicalPaginate;
8890
import org.opensearch.sql.planner.logical.LogicalPlan;
8991
import org.opensearch.sql.planner.logical.LogicalProject;
9092
import org.opensearch.sql.planner.logical.LogicalRareTopN;
@@ -529,6 +531,12 @@ public LogicalPlan visitML(ML node, AnalysisContext context) {
529531
return new LogicalML(child, node.getArguments());
530532
}
531533

534+
@Override
535+
public LogicalPlan visitPaginate(Paginate paginate, AnalysisContext context) {
536+
LogicalPlan child = paginate.getChild().get(0).accept(this, context);
537+
return new LogicalPaginate(paginate.getPageSize(), List.of(child));
538+
}
539+
532540
/**
533541
* The first argument is always "asc", others are optional.
534542
* Given nullFirst argument, use its value. Otherwise just use DEFAULT_ASC/DESC.

core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.opensearch.sql.ast.tree.Kmeans;
4848
import org.opensearch.sql.ast.tree.Limit;
4949
import org.opensearch.sql.ast.tree.ML;
50+
import org.opensearch.sql.ast.tree.Paginate;
5051
import org.opensearch.sql.ast.tree.Parse;
5152
import org.opensearch.sql.ast.tree.Project;
5253
import org.opensearch.sql.ast.tree.RareTopN;
@@ -289,4 +290,8 @@ public T visitQuery(Query node, C context) {
289290
public T visitExplain(Explain node, C context) {
290291
return visitStatement(node, context);
291292
}
293+
294+
public T visitPaginate(Paginate paginate, C context) {
295+
return visitChildren(paginate, context);
296+
}
292297
}

core/src/main/java/org/opensearch/sql/ast/statement/Query.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
public class Query extends Statement {
2828

2929
protected final UnresolvedPlan plan;
30+
protected final int fetchSize;
3031

3132
@Override
3233
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import java.util.List;
9+
import lombok.EqualsAndHashCode;
10+
import lombok.Getter;
11+
import lombok.RequiredArgsConstructor;
12+
import lombok.ToString;
13+
import org.opensearch.sql.ast.AbstractNodeVisitor;
14+
import org.opensearch.sql.ast.Node;
15+
16+
/**
17+
* AST node to represent pagination operation.
18+
* Actually a wrapper to the AST.
19+
*/
20+
@RequiredArgsConstructor
21+
@EqualsAndHashCode(callSuper = false)
22+
@ToString
23+
public class Paginate extends UnresolvedPlan {
24+
@Getter
25+
private final int pageSize;
26+
private UnresolvedPlan child;
27+
28+
public Paginate(int pageSize, UnresolvedPlan child) {
29+
this.pageSize = pageSize;
30+
this.child = child;
31+
}
32+
33+
@Override
34+
public List<? extends Node> getChild() {
35+
return List.of(child);
36+
}
37+
38+
@Override
39+
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
40+
return nodeVisitor.visitPaginate(this, context);
41+
}
42+
43+
@Override
44+
public UnresolvedPlan attach(UnresolvedPlan child) {
45+
this.child = child;
46+
return this;
47+
}
48+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.exception;
7+
8+
/**
9+
* This should be thrown by V2 engine to support fallback scenario.
10+
*/
11+
public class UnsupportedCursorRequestException extends RuntimeException {
12+
}

core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.opensearch.sql.common.response.ResponseListener;
1515
import org.opensearch.sql.data.model.ExprValue;
1616
import org.opensearch.sql.data.type.ExprType;
17+
import org.opensearch.sql.executor.pagination.Cursor;
1718
import org.opensearch.sql.planner.physical.PhysicalPlan;
1819

1920
/**
@@ -53,6 +54,8 @@ void execute(PhysicalPlan plan, ExecutionContext context,
5354
class QueryResponse {
5455
private final Schema schema;
5556
private final List<ExprValue> results;
57+
private final long total;
58+
private final Cursor cursor;
5659
}
5760

5861
@Data

core/src/main/java/org/opensearch/sql/executor/QueryId.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* Query id of {@link AbstractPlan}.
1717
*/
1818
public class QueryId {
19+
public static final QueryId None = new QueryId("");
1920
/**
2021
* Query id.
2122
*/

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
import org.opensearch.sql.common.response.ResponseListener;
1616
import org.opensearch.sql.planner.PlanContext;
1717
import org.opensearch.sql.planner.Planner;
18+
import org.opensearch.sql.planner.logical.LogicalPaginate;
1819
import org.opensearch.sql.planner.logical.LogicalPlan;
20+
import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer;
1921
import org.opensearch.sql.planner.physical.PhysicalPlan;
2022

2123
/**
@@ -28,7 +30,15 @@ public class QueryService {
2830

2931
private final ExecutionEngine executionEngine;
3032

33+
/**
34+
* There are two planners, one - to handle pagination requests (cursor/scroll) only and
35+
* another one for everything else.
36+
* @see OpenSearchPluginModule#queryPlanFactory (:plugin module)
37+
* @see LogicalPlanOptimizer#paginationCreate
38+
* @see QueryService
39+
*/
3140
private final Planner planner;
41+
private final Planner paginationPlanner;
3242

3343
/**
3444
* Execute the {@link UnresolvedPlan}, using {@link ResponseListener} to get response.
@@ -46,6 +56,14 @@ public void execute(UnresolvedPlan plan,
4656
}
4757
}
4858

59+
/**
60+
* Execute a physical plan without analyzing or planning anything.
61+
*/
62+
public void executePlan(PhysicalPlan plan,
63+
ResponseListener<ExecutionEngine.QueryResponse> listener) {
64+
executionEngine.execute(plan, ExecutionContext.emptyExecutionContext(), listener);
65+
}
66+
4967
/**
5068
* Execute the {@link UnresolvedPlan}, with {@link PlanContext} and using {@link ResponseListener}
5169
* to get response.
@@ -97,6 +115,6 @@ public LogicalPlan analyze(UnresolvedPlan plan) {
97115
* Translate {@link LogicalPlan} to {@link PhysicalPlan}.
98116
*/
99117
public PhysicalPlan plan(LogicalPlan plan) {
100-
return planner.plan(plan);
118+
return plan instanceof LogicalPaginate ? paginationPlanner.plan(plan) : planner.plan(plan);
101119
}
102120
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.executor.execution;
7+
8+
import org.opensearch.sql.common.response.ResponseListener;
9+
import org.opensearch.sql.executor.ExecutionEngine;
10+
import org.opensearch.sql.executor.QueryId;
11+
import org.opensearch.sql.executor.QueryService;
12+
import org.opensearch.sql.executor.pagination.PaginatedPlanCache;
13+
import org.opensearch.sql.planner.physical.PhysicalPlan;
14+
15+
/**
16+
* ContinuePaginatedPlan represents cursor a request.
17+
* It returns subsequent pages to the user (2nd page and all next).
18+
* {@link PaginatedPlan}
19+
*/
20+
public class ContinuePaginatedPlan extends AbstractPlan {
21+
22+
private final String cursor;
23+
private final QueryService queryService;
24+
private final PaginatedPlanCache paginatedPlanCache;
25+
26+
private final ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener;
27+
28+
29+
/**
30+
* Create an abstract plan that can continue paginating a given cursor.
31+
*/
32+
public ContinuePaginatedPlan(QueryId queryId, String cursor, QueryService queryService,
33+
PaginatedPlanCache planCache,
34+
ResponseListener<ExecutionEngine.QueryResponse>
35+
queryResponseListener) {
36+
super(queryId);
37+
this.cursor = cursor;
38+
this.paginatedPlanCache = planCache;
39+
this.queryService = queryService;
40+
this.queryResponseListener = queryResponseListener;
41+
}
42+
43+
@Override
44+
public void execute() {
45+
try {
46+
PhysicalPlan plan = paginatedPlanCache.convertToPlan(cursor);
47+
queryService.executePlan(plan, queryResponseListener);
48+
} catch (Exception e) {
49+
queryResponseListener.onFailure(e);
50+
}
51+
}
52+
53+
@Override
54+
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {
55+
listener.onFailure(new UnsupportedOperationException(
56+
"Explain of a paged query continuation is not supported. "
57+
+ "Use `explain` for the initial query request."));
58+
}
59+
}

0 commit comments

Comments
 (0)