forked from opensearch-project/sql
-
Notifications
You must be signed in to change notification settings - Fork 0
Support pagination in V2 engine, phase 1 #226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 42 commits
Commits
Show all changes
47 commits
Select commit
Hold shift + click to select a range
7e2dc80
Fixing integration tests broken during POC
d359751
Comment to clarify an exception.
9c3f7fe
Add support for paginated scroll request, first page.
1ee718b
Progress on paginated scroll request, subsequent page.
f3cade6
Move `ExpressionSerializer` from `opensearch` to `core`.
Yury-Fridlyand db342b8
Rename `Cursor` `asString` to `toString`.
Yury-Fridlyand c8f0935
Disable scroll cleaning.
Yury-Fridlyand fffc36d
Add full cursor serialization and deserialization.
Yury-Fridlyand d844977
Misc fixes.
Yury-Fridlyand 333432e
Further work on pagination.
Yury-Fridlyand 85c8825
Pagination fix for empty indices.
Yury-Fridlyand 484a8fe
Fix error reporting on wrong cursor.
Yury-Fridlyand cccce53
Minor comments and error reporting improvement.
Yury-Fridlyand 2895883
Add an end-to-end integration test.
Yury-Fridlyand dd6fcd6
Add `explain` request handlers.
Yury-Fridlyand 2a19e56
Add IT for explain.
Yury-Fridlyand a3ef2bf
Address issues flagged by checkstyle build step (#229)
2d29549
Pagination, phase 1: Add unit tests for `:core` module with coverage.…
Yury-Fridlyand 70ccfcb
Pagination, phase 1: Add unit tests for SQL module with coverage. (#239)
Yury-Fridlyand 803f50e
Pagination, phase 1: Add unit tests for `:opensearch` module with cov…
Yury-Fridlyand 27e1793
Fix the merges.
Yury-Fridlyand 304616d
Fix explain.
Yury-Fridlyand 1b5ab7e
Fix scroll cleaning.
Yury-Fridlyand f4ea4ad
Store `TotalHits` and use it to report `total` in response.
Yury-Fridlyand 7f0acdd
Add missing UT for `:protocol` module.
Yury-Fridlyand 2ce1626
Fix PPL UTs damaged in f4ea4ad8c.
Yury-Fridlyand b2e6e56
Minor checkstyle fixes.
Yury-Fridlyand c7ad219
Fallback to v1 engine for pagination (#245)
MaxKsyunz 981bc25
Add UT with coverage for `toCursor` serialization.
Yury-Fridlyand 960c039
Fix broken tests in `legacy`.
Yury-Fridlyand 4f0c176
Fix getting `total` from non-paged requests and from queries without …
Yury-Fridlyand bdd52a0
Fix scroll cleaning.
Yury-Fridlyand a16332f
Fix cursor request processing.
Yury-Fridlyand 9f9e873
Update ITs.
Yury-Fridlyand 3340e38
Fix (again) TotalHits feature.
Yury-Fridlyand 524f220
Fix typo in prometheus config.
Yury-Fridlyand 281f3cd
Recover commented logging.
Yury-Fridlyand ca76e1b
Move `test_pagination_blackbox` to a separate class and add logging.
Yury-Fridlyand c8fcd4e
Address some PR feedbacks: rename some classes and revert unnecessary…
Yury-Fridlyand ec5fb40
Minor commenting.
Yury-Fridlyand 4213388
Address PR comments.
Yury-Fridlyand ca20d16
Minor missing changes.
Yury-Fridlyand a58733e
Integration tests for fetch_size, max_result_window, and query.size_l…
75b8140
Remove `PaginatedQueryService`, extend `QueryService` to hold two pla…
Yury-Fridlyand ba81407
Move push down functions from request builders to a new interface.
Yury-Fridlyand cec012b
Some file moves.
Yury-Fridlyand 9032b3b
Minor clean-up according to PR review.
Yury-Fridlyand File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
48 changes: 48 additions & 0 deletions
48
core/src/main/java/org/opensearch/sql/ast/tree/Paginate.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.ast.tree; | ||
|
|
||
| import java.util.List; | ||
| import lombok.EqualsAndHashCode; | ||
| import lombok.Getter; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.ToString; | ||
| import org.opensearch.sql.ast.AbstractNodeVisitor; | ||
| import org.opensearch.sql.ast.Node; | ||
|
|
||
| /** | ||
| * AST node to represent pagination operation. | ||
| * Actually a wrapper to the AST. | ||
| */ | ||
| @RequiredArgsConstructor | ||
| @EqualsAndHashCode(callSuper = false) | ||
| @ToString | ||
| public class Paginate extends UnresolvedPlan { | ||
Yury-Fridlyand marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| @Getter | ||
| private final int pageSize; | ||
| private UnresolvedPlan child; | ||
|
|
||
| public Paginate(int pageSize, UnresolvedPlan child) { | ||
| this.pageSize = pageSize; | ||
| this.child = child; | ||
| } | ||
|
|
||
| @Override | ||
| public List<? extends Node> getChild() { | ||
| return List.of(child); | ||
| } | ||
|
|
||
| @Override | ||
| public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) { | ||
| return nodeVisitor.visitPaginate(this, context); | ||
| } | ||
|
|
||
| @Override | ||
| public UnresolvedPlan attach(UnresolvedPlan child) { | ||
| this.child = child; | ||
| return this; | ||
| } | ||
| } | ||
12 changes: 12 additions & 0 deletions
12
core/src/main/java/org/opensearch/sql/exception/UnsupportedCursorRequestException.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.exception; | ||
|
|
||
| /** | ||
| * This should be thrown by V2 engine to support fallback scenario. | ||
| */ | ||
| public class UnsupportedCursorRequestException extends RuntimeException { | ||
Yury-Fridlyand marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
65 changes: 65 additions & 0 deletions
65
core/src/main/java/org/opensearch/sql/executor/CanPaginateVisitor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.executor; | ||
|
|
||
| import org.opensearch.sql.ast.AbstractNodeVisitor; | ||
| import org.opensearch.sql.ast.Node; | ||
| import org.opensearch.sql.ast.expression.AllFields; | ||
| import org.opensearch.sql.ast.tree.Project; | ||
| import org.opensearch.sql.ast.tree.Relation; | ||
|
|
||
| /** | ||
| * Use this unresolved plan visitor to check if a plan can be serialized by PaginatedPlanCache. | ||
MaxKsyunz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * If plan.accept(new CanPaginateVisitor(...)) returns true, | ||
| * then PaginatedPlanCache.convertToCursor will succeed. Otherwise, it will fail. | ||
| * The purpose of this visitor is to activate legacy engine fallback mechanism. | ||
| * Currently, the conditions are: | ||
| * - only projection of a relation is supported. | ||
| * - projection only has * (a.k.a. allFields). | ||
| * - Relation only scans one table | ||
| * - The table is an open search index. | ||
| * So it accepts only queries like `select * from $index` | ||
| * See PaginatedPlanCache.canConvertToCursor for usage. | ||
| */ | ||
| public class CanPaginateVisitor extends AbstractNodeVisitor<Boolean, Object> { | ||
|
|
||
| @Override | ||
| public Boolean visitRelation(Relation node, Object context) { | ||
| if (!node.getChild().isEmpty()) { | ||
| // Relation instance should never have a child, but check just in case. | ||
| return Boolean.FALSE; | ||
| } | ||
|
|
||
| return Boolean.TRUE; | ||
| } | ||
|
|
||
| @Override | ||
| public Boolean visitChildren(Node node, Object context) { | ||
| return Boolean.FALSE; | ||
| } | ||
|
|
||
| @Override | ||
| public Boolean visitProject(Project node, Object context) { | ||
| // Allow queries with 'SELECT *' only. Those restriction could be removed, but consider | ||
| // in-memory aggregation performed by window function (see WindowOperator). | ||
| // SELECT max(age) OVER (PARTITION BY city) ... | ||
| var projections = node.getProjectList(); | ||
| if (projections.size() != 1) { | ||
| return Boolean.FALSE; | ||
| } | ||
|
|
||
| if (!(projections.get(0) instanceof AllFields)) { | ||
| return Boolean.FALSE; | ||
| } | ||
|
|
||
| var children = node.getChild(); | ||
| if (children.size() != 1) { | ||
| return Boolean.FALSE; | ||
| } | ||
|
|
||
| return children.get(0).accept(this, context); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
162 changes: 162 additions & 0 deletions
162
core/src/main/java/org/opensearch/sql/executor/PaginatedPlanCache.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,162 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.executor; | ||
|
|
||
| import com.google.common.hash.HashCode; | ||
| import java.io.ByteArrayInputStream; | ||
| import java.io.ByteArrayOutputStream; | ||
| import java.io.IOException; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.zip.GZIPInputStream; | ||
| import java.util.zip.GZIPOutputStream; | ||
| import lombok.RequiredArgsConstructor; | ||
| import org.opensearch.sql.ast.tree.UnresolvedPlan; | ||
| import org.opensearch.sql.expression.NamedExpression; | ||
| import org.opensearch.sql.expression.serialization.DefaultExpressionSerializer; | ||
| import org.opensearch.sql.opensearch.executor.Cursor; | ||
| import org.opensearch.sql.planner.physical.PaginateOperator; | ||
| import org.opensearch.sql.planner.physical.PhysicalPlan; | ||
| import org.opensearch.sql.planner.physical.ProjectOperator; | ||
| import org.opensearch.sql.storage.StorageEngine; | ||
| import org.opensearch.sql.storage.TableScanOperator; | ||
|
|
||
| /** | ||
| * This class is entry point to paged requests. It is responsible to cursor serialization | ||
| * and deserialization. | ||
| */ | ||
| @RequiredArgsConstructor | ||
| public class PaginatedPlanCache { | ||
Yury-Fridlyand marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| public static final String CURSOR_PREFIX = "n:"; | ||
| private final StorageEngine storageEngine; | ||
|
|
||
| public boolean canConvertToCursor(UnresolvedPlan plan) { | ||
| return plan.accept(new CanPaginateVisitor(), null); | ||
| } | ||
|
|
||
| /** | ||
| * Converts a physical plan tree to a cursor. May cache plan related data somewhere. | ||
| */ | ||
| public Cursor convertToCursor(PhysicalPlan plan) throws IOException { | ||
| if (plan instanceof PaginateOperator) { | ||
| var cursor = plan.toCursor(); | ||
| if (cursor == null) { | ||
| return Cursor.None; | ||
| } | ||
| var raw = CURSOR_PREFIX + compress(cursor); | ||
| return new Cursor(raw.getBytes()); | ||
| } | ||
| return Cursor.None; | ||
| } | ||
|
|
||
| /** | ||
| * Compress serialized query plan. | ||
| * @param str string representing a query plan | ||
| * @return str compressed with gzip. | ||
| */ | ||
| String compress(String str) throws IOException { | ||
| if (str == null || str.length() == 0) { | ||
| return ""; | ||
| } | ||
| ByteArrayOutputStream out = new ByteArrayOutputStream(); | ||
| GZIPOutputStream gzip = new GZIPOutputStream(out); | ||
| gzip.write(str.getBytes()); | ||
| gzip.close(); | ||
| return HashCode.fromBytes(out.toByteArray()).toString(); | ||
| } | ||
|
|
||
| /** | ||
| * Decompresses a query plan that was compress with {@link PaginatedPlanCache#compress}. | ||
| * @param input compressed query plan | ||
| * @return decompressed string | ||
| */ | ||
| String decompress(String input) throws IOException { | ||
| if (input == null || input.length() == 0) { | ||
| return ""; | ||
| } | ||
| GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream( | ||
| HashCode.fromString(input).asBytes())); | ||
| return new String(gzip.readAllBytes()); | ||
| } | ||
|
|
||
| /** | ||
| * Parse `NamedExpression`s from cursor. | ||
| * @param listToFill List to fill with data. | ||
| * @param cursor Cursor to parse. | ||
| * @return Remaining part of the cursor. | ||
| */ | ||
| private String parseNamedExpressions(List<NamedExpression> listToFill, String cursor) { | ||
| var serializer = new DefaultExpressionSerializer(); | ||
| if (cursor.startsWith(")")) { //empty list | ||
Yury-Fridlyand marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return cursor.substring(cursor.indexOf(',') + 1); | ||
| } | ||
| while (!cursor.startsWith("(")) { | ||
| listToFill.add((NamedExpression) | ||
| serializer.deserialize(cursor.substring(0, | ||
| Math.min(cursor.indexOf(','), cursor.indexOf(')'))))); | ||
| cursor = cursor.substring(cursor.indexOf(',') + 1); | ||
| } | ||
| return cursor; | ||
| } | ||
|
|
||
| /** | ||
| * Converts a cursor to a physical plan tree. | ||
| */ | ||
| public PhysicalPlan convertToPlan(String cursor) { | ||
| if (!cursor.startsWith(CURSOR_PREFIX)) { | ||
| throw new UnsupportedOperationException("Unsupported cursor"); | ||
Yury-Fridlyand marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| try { | ||
| cursor = cursor.substring(CURSOR_PREFIX.length()); | ||
| cursor = decompress(cursor); | ||
|
|
||
| // TODO Parse with ANTLR or serialize as JSON/XML | ||
Yury-Fridlyand marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (!cursor.startsWith("(Paginate,")) { | ||
| throw new UnsupportedOperationException("Unsupported cursor"); | ||
| } | ||
| // TODO add checks for > 0 | ||
| cursor = cursor.substring(cursor.indexOf(',') + 1); | ||
| final int currentPageIndex = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10); | ||
|
|
||
| cursor = cursor.substring(cursor.indexOf(',') + 1); | ||
| final int pageSize = Integer.parseInt(cursor, 0, cursor.indexOf(','), 10); | ||
|
|
||
| cursor = cursor.substring(cursor.indexOf(',') + 1); | ||
| if (!cursor.startsWith("(Project,")) { | ||
| throw new UnsupportedOperationException("Unsupported cursor"); | ||
| } | ||
| cursor = cursor.substring(cursor.indexOf(',') + 1); | ||
| if (!cursor.startsWith("(namedParseExpressions,")) { | ||
| throw new UnsupportedOperationException("Unsupported cursor"); | ||
| } | ||
|
|
||
| cursor = cursor.substring(cursor.indexOf(',') + 1); | ||
| List<NamedExpression> namedParseExpressions = new ArrayList<>(); | ||
| cursor = parseNamedExpressions(namedParseExpressions, cursor); | ||
|
|
||
| List<NamedExpression> projectList = new ArrayList<>(); | ||
| if (!cursor.startsWith("(projectList,")) { | ||
| throw new UnsupportedOperationException("Unsupported cursor"); | ||
| } | ||
| cursor = cursor.substring(cursor.indexOf(',') + 1); | ||
| cursor = parseNamedExpressions(projectList, cursor); | ||
|
|
||
| if (!cursor.startsWith("(OpenSearchPagedIndexScan,")) { | ||
| throw new UnsupportedOperationException("Unsupported cursor"); | ||
| } | ||
| cursor = cursor.substring(cursor.indexOf(',') + 1); | ||
| var indexName = cursor.substring(0, cursor.indexOf(',')); | ||
| cursor = cursor.substring(cursor.indexOf(',') + 1); | ||
| var scrollId = cursor.substring(0, cursor.indexOf(')')); | ||
| TableScanOperator scan = storageEngine.getTableScan(indexName, scrollId); | ||
|
|
||
| return new PaginateOperator(new ProjectOperator(scan, projectList, namedParseExpressions), | ||
| pageSize, currentPageIndex); | ||
| } catch (Exception e) { | ||
| throw new UnsupportedOperationException("Unsupported cursor", e); | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.