-
Notifications
You must be signed in to change notification settings - Fork 179
Support pagination in V2 engine, phase 1 #1497
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
e951192
4948ac4
37e7ebf
529df99
64386f3
bc8c73f
b9cb0d0
9a1a17c
e42bcd4
899d083
df54e79
ce509b0
dd3df8c
1f6cf70
e745d2d
0f66341
7dd445c
97b7435
f9ae484
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.ast.tree; | ||
|
|
||
| import java.util.List; | ||
| import lombok.EqualsAndHashCode; | ||
| import lombok.Getter; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.ToString; | ||
| import org.opensearch.sql.ast.AbstractNodeVisitor; | ||
| import org.opensearch.sql.ast.Node; | ||
|
|
||
| /** | ||
| * AST node to represent pagination operation. | ||
| * Actually a wrapper to the AST. | ||
| */ | ||
| @RequiredArgsConstructor | ||
| @EqualsAndHashCode(callSuper = false) | ||
| @ToString | ||
| public class Paginate extends UnresolvedPlan { | ||
| @Getter | ||
| private final int pageSize; | ||
| private UnresolvedPlan child; | ||
|
|
||
| public Paginate(int pageSize, UnresolvedPlan child) { | ||
| this.pageSize = pageSize; | ||
| this.child = child; | ||
| } | ||
|
|
||
| @Override | ||
| public List<? extends Node> getChild() { | ||
| return List.of(child); | ||
| } | ||
|
|
||
| @Override | ||
| public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) { | ||
| return nodeVisitor.visitPaginate(this, context); | ||
| } | ||
|
|
||
| @Override | ||
| public UnresolvedPlan attach(UnresolvedPlan child) { | ||
| this.child = child; | ||
| return this; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.exception; | ||
|
|
||
| /** | ||
| * This should be thrown on serialization of a PhysicalPlan tree if paging is finished. | ||
| * Processing of such exception should outcome of responding no cursor to the user. | ||
| */ | ||
| public class NoCursorException extends RuntimeException { | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.exception; | ||
|
|
||
| /** | ||
| * This should be thrown by V2 engine to support fallback scenario. | ||
| */ | ||
| public class UnsupportedCursorRequestException extends RuntimeException { | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.executor.execution; | ||
|
|
||
| import org.opensearch.sql.common.response.ResponseListener; | ||
| import org.opensearch.sql.executor.ExecutionEngine; | ||
| import org.opensearch.sql.executor.QueryId; | ||
| import org.opensearch.sql.executor.QueryService; | ||
| import org.opensearch.sql.executor.pagination.PlanSerializer; | ||
| import org.opensearch.sql.planner.physical.PhysicalPlan; | ||
|
|
||
| /** | ||
| * ContinuePaginatedPlan represents cursor a request. | ||
| * It returns subsequent pages to the user (2nd page and all next). | ||
| * {@link PaginatedPlan} | ||
| */ | ||
| public class ContinuePaginatedPlan extends AbstractPlan { | ||
|
|
||
| private final String cursor; | ||
| private final QueryService queryService; | ||
| private final PlanSerializer planSerializer; | ||
|
|
||
| private final ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener; | ||
|
|
||
|
|
||
| /** | ||
| * Create an abstract plan that can continue paginating a given cursor. | ||
| */ | ||
| public ContinuePaginatedPlan(QueryId queryId, String cursor, QueryService queryService, | ||
| PlanSerializer planCache, | ||
| ResponseListener<ExecutionEngine.QueryResponse> | ||
| queryResponseListener) { | ||
| super(queryId); | ||
| this.cursor = cursor; | ||
| this.planSerializer = planCache; | ||
| this.queryService = queryService; | ||
| this.queryResponseListener = queryResponseListener; | ||
| } | ||
|
|
||
| @Override | ||
| public void execute() { | ||
| try { | ||
| PhysicalPlan plan = planSerializer.convertToPlan(cursor); | ||
| queryService.executePlan(plan, queryResponseListener); | ||
| } catch (Exception e) { | ||
| queryResponseListener.onFailure(e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) { | ||
| listener.onFailure(new UnsupportedOperationException( | ||
| "Explain of a paged query continuation is not supported. " | ||
| + "Use `explain` for the initial query request.")); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.sql.executor.execution; | ||
|
|
||
| import org.apache.commons.lang3.NotImplementedException; | ||
| import org.opensearch.sql.ast.tree.Paginate; | ||
| import org.opensearch.sql.ast.tree.UnresolvedPlan; | ||
| import org.opensearch.sql.common.response.ResponseListener; | ||
| import org.opensearch.sql.executor.ExecutionEngine; | ||
| import org.opensearch.sql.executor.QueryId; | ||
| import org.opensearch.sql.executor.QueryService; | ||
|
|
||
| /** | ||
| * PaginatedPlan represents a page request. Dislike a regular QueryPlan, | ||
| * it returns paged response to the user and cursor, which allows to query | ||
| * next page. | ||
| * {@link ContinuePaginatedPlan} | ||
| */ | ||
| public class PaginatedPlan extends AbstractPlan { | ||
| final UnresolvedPlan plan; | ||
dai-chen marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| final int fetchSize; | ||
| final QueryService queryService; | ||
| final ResponseListener<ExecutionEngine.QueryResponse> | ||
| queryResponseResponseListener; | ||
|
|
||
| /** | ||
| * Create an abstract plan that can start paging a query. | ||
| */ | ||
| public PaginatedPlan(QueryId queryId, UnresolvedPlan plan, int fetchSize, | ||
|
||
| QueryService queryService, | ||
| ResponseListener<ExecutionEngine.QueryResponse> | ||
| queryResponseResponseListener) { | ||
| super(queryId); | ||
| this.plan = plan; | ||
| this.fetchSize = fetchSize; | ||
| this.queryService = queryService; | ||
| this.queryResponseResponseListener = queryResponseResponseListener; | ||
| } | ||
|
|
||
| @Override | ||
| public void execute() { | ||
| queryService.execute(new Paginate(fetchSize, plan), queryResponseResponseListener); | ||
| } | ||
|
|
||
| @Override | ||
| public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) { | ||
| listener.onFailure(new NotImplementedException( | ||
| "`explain` feature for paginated requests is not implemented yet.")); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -18,9 +18,11 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||
| import org.opensearch.sql.ast.statement.Query; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.opensearch.sql.ast.statement.Statement; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.opensearch.sql.common.response.ResponseListener; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.opensearch.sql.exception.UnsupportedCursorRequestException; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.opensearch.sql.executor.ExecutionEngine; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.opensearch.sql.executor.QueryId; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.opensearch.sql.executor.QueryService; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.opensearch.sql.executor.pagination.PlanSerializer; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * QueryExecution Factory. | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -37,9 +39,10 @@ public class QueryPlanFactory | |||||||||||||||||||||||||||||||||||||||||||||||||
| * Query Service. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private final QueryService queryService; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private final PlanSerializer planSerializer; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * NO_CONSUMER_RESPONSE_LISTENER should never been called. It is only used as constructor | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * NO_CONSUMER_RESPONSE_LISTENER should never be called. It is only used as constructor | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * parameter of {@link QueryPlan}. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||
| @VisibleForTesting | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -62,39 +65,62 @@ public void onFailure(Exception e) { | |||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * Create QueryExecution from Statement. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||
| public AbstractPlan create( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| public AbstractPlan createContinuePaginatedPlan( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Statement statement, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Optional<ResponseListener<ExecutionEngine.QueryResponse>> queryListener, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Optional<ResponseListener<ExecutionEngine.ExplainResponse>> explainListener) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return statement.accept(this, Pair.of(queryListener, explainListener)); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * Creates a ContinuePaginatedPlan from a cursor. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||
| public AbstractPlan createContinuePaginatedPlan(String cursor, boolean isExplain, | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| ResponseListener<ExecutionEngine.ExplainResponse> explainListener) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| QueryId queryId = QueryId.queryId(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| var plan = new ContinuePaginatedPlan(queryId, cursor, queryService, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| planSerializer, queryResponseListener); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return isExplain ? new ExplainPlan(queryId, plan, explainListener) : plan; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||
| public AbstractPlan visitQuery( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Query node, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Pair< | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Optional<ResponseListener<ExecutionEngine.QueryResponse>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Optional<ResponseListener<ExecutionEngine.ExplainResponse>>> | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Pair<Optional<ResponseListener<ExecutionEngine.QueryResponse>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Optional<ResponseListener<ExecutionEngine.ExplainResponse>>> | ||||||||||||||||||||||||||||||||||||||||||||||||||
| context) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Preconditions.checkArgument( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| context.getLeft().isPresent(), "[BUG] query listener must be not null"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| return new QueryPlan(QueryId.queryId(), node.getPlan(), queryService, context.getLeft().get()); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (node.getFetchSize() > 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (planSerializer.canConvertToCursor(node.getPlan())) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return new PaginatedPlan(QueryId.queryId(), node.getPlan(), node.getFetchSize(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| queryService, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| context.getLeft().get()); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // This should be picked up by the legacy engine. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| throw new UnsupportedCursorRequestException(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return new QueryPlan(QueryId.queryId(), node.getPlan(), queryService, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| context.getLeft().get()); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||||||||||||||||||||||
| public AbstractPlan visitExplain( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Explain node, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Pair< | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Optional<ResponseListener<ExecutionEngine.QueryResponse>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Optional<ResponseListener<ExecutionEngine.ExplainResponse>>> | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Pair<Optional<ResponseListener<ExecutionEngine.QueryResponse>>, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Optional<ResponseListener<ExecutionEngine.ExplainResponse>>> | ||||||||||||||||||||||||||||||||||||||||||||||||||
| context) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Preconditions.checkArgument( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| context.getRight().isPresent(), "[BUG] explain listener must be not null"); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| return new ExplainPlan( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| QueryId.queryId(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| create(node.getStatement(), Optional.of(NO_CONSUMER_RESPONSE_LISTENER), Optional.empty()), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| createContinuePaginatedPlan(node.getStatement(), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Optional.of(NO_CONSUMER_RESPONSE_LISTENER), Optional.empty()), | ||||||||||||||||||||||||||||||||||||||||||||||||||
| context.getRight().get()); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.