Skip to content

Commit 1c7233c

Browse files
Max KsyunzYury-Fridlyand
andauthored
Merge OpenSearchPagedIndexScan and OpenSearchIndexScan (#1600)
Signed-off-by: MaxKsyunz <[email protected]> Signed-off-by: Max Ksyunz <[email protected]> Co-authored-by: Yury-Fridlyand <[email protected]>
1 parent 3d657c3 commit 1c7233c

File tree

81 files changed

+1571
-2268
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+1571
-2268
lines changed

core/src/main/java/org/opensearch/sql/analysis/Analyzer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package org.opensearch.sql.analysis;
88

9+
import static org.opensearch.sql.analysis.DataSourceSchemaIdentifierNameResolver.DEFAULT_DATASOURCE_NAME;
910
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST;
1011
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_LAST;
1112
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC;
@@ -44,6 +45,7 @@
4445
import org.opensearch.sql.ast.tree.Aggregation;
4546
import org.opensearch.sql.ast.tree.Dedupe;
4647
import org.opensearch.sql.ast.tree.Eval;
48+
import org.opensearch.sql.ast.tree.FetchCursor;
4749
import org.opensearch.sql.ast.tree.Filter;
4850
import org.opensearch.sql.ast.tree.Head;
4951
import org.opensearch.sql.ast.tree.Kmeans;
@@ -64,7 +66,6 @@
6466
import org.opensearch.sql.common.antlr.SyntaxCheckException;
6567
import org.opensearch.sql.data.model.ExprMissingValue;
6668
import org.opensearch.sql.data.type.ExprCoreType;
67-
import org.opensearch.sql.data.type.ExprType;
6869
import org.opensearch.sql.datasource.DataSourceService;
6970
import org.opensearch.sql.exception.SemanticCheckException;
7071
import org.opensearch.sql.expression.DSL;
@@ -84,6 +85,7 @@
8485
import org.opensearch.sql.planner.logical.LogicalAggregation;
8586
import org.opensearch.sql.planner.logical.LogicalDedupe;
8687
import org.opensearch.sql.planner.logical.LogicalEval;
88+
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
8789
import org.opensearch.sql.planner.logical.LogicalFilter;
8890
import org.opensearch.sql.planner.logical.LogicalLimit;
8991
import org.opensearch.sql.planner.logical.LogicalML;
@@ -211,7 +213,6 @@ public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext contex
211213
tableFunctionImplementation.applyArguments());
212214
}
213215

214-
215216
@Override
216217
public LogicalPlan visitLimit(Limit node, AnalysisContext context) {
217218
LogicalPlan child = node.getChild().get(0).accept(this, context);
@@ -587,4 +588,9 @@ private SortOption analyzeSortOption(List<Argument> fieldArgs) {
587588
return asc ? SortOption.DEFAULT_ASC : SortOption.DEFAULT_DESC;
588589
}
589590

591+
@Override
592+
public LogicalPlan visitFetchCursor(FetchCursor cursor, AnalysisContext context) {
593+
return new LogicalFetchCursor(cursor.getCursor(),
594+
dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine());
595+
}
590596
}

core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.opensearch.sql.ast.tree.Aggregation;
4444
import org.opensearch.sql.ast.tree.Dedupe;
4545
import org.opensearch.sql.ast.tree.Eval;
46+
import org.opensearch.sql.ast.tree.FetchCursor;
4647
import org.opensearch.sql.ast.tree.Filter;
4748
import org.opensearch.sql.ast.tree.Head;
4849
import org.opensearch.sql.ast.tree.Kmeans;
@@ -299,4 +300,8 @@ public T visitExplain(Explain node, C context) {
299300
public T visitPaginate(Paginate paginate, C context) {
300301
return visitChildren(paginate, context);
301302
}
303+
304+
public T visitFetchCursor(FetchCursor cursor, C context) {
305+
return visit(cursor, context);
306+
}
302307
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.ast.tree;
7+
8+
import lombok.EqualsAndHashCode;
9+
import lombok.Getter;
10+
import lombok.RequiredArgsConstructor;
11+
import org.opensearch.sql.ast.AbstractNodeVisitor;
12+
13+
/**
14+
* An unresolved plan that represents fetching the next
15+
* batch in paginationed plan.
16+
*/
17+
@RequiredArgsConstructor
18+
@EqualsAndHashCode(callSuper = false)
19+
public class FetchCursor extends UnresolvedPlan {
20+
@Getter
21+
final String cursor;
22+
23+
@Override
24+
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
25+
return nodeVisitor.visitFetchCursor(this, context);
26+
}
27+
28+
@Override
29+
public UnresolvedPlan attach(UnresolvedPlan child) {
30+
throw new UnsupportedOperationException("Cursor unresolved plan does not support children");
31+
}
32+
}

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,6 @@ public void execute(UnresolvedPlan plan,
4646
}
4747
}
4848

49-
/**
50-
* Execute a physical plan without analyzing or planning anything.
51-
*/
52-
public void executePlan(PhysicalPlan plan,
53-
ResponseListener<ExecutionEngine.QueryResponse> listener) {
54-
executionEngine.execute(plan, ExecutionContext.emptyExecutionContext(), listener);
55-
}
56-
5749
/**
5850
* Execute the {@link UnresolvedPlan}, with {@link PlanContext} and using {@link ResponseListener}
5951
* to get response.

core/src/main/java/org/opensearch/sql/executor/execution/ContinuePaginatedPlan.java

Lines changed: 0 additions & 58 deletions
This file was deleted.

core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
import org.opensearch.sql.ast.statement.Explain;
1818
import org.opensearch.sql.ast.statement.Query;
1919
import org.opensearch.sql.ast.statement.Statement;
20+
import org.opensearch.sql.ast.tree.FetchCursor;
21+
import org.opensearch.sql.ast.tree.UnresolvedPlan;
2022
import org.opensearch.sql.common.response.ResponseListener;
2123
import org.opensearch.sql.exception.UnsupportedCursorRequestException;
2224
import org.opensearch.sql.executor.ExecutionEngine;
2325
import org.opensearch.sql.executor.QueryId;
2426
import org.opensearch.sql.executor.QueryService;
25-
import org.opensearch.sql.executor.pagination.PlanSerializer;
27+
import org.opensearch.sql.executor.pagination.CanPaginateVisitor;
2628

2729
/**
2830
* QueryExecution Factory.
@@ -39,7 +41,6 @@ public class QueryPlanFactory
3941
* Query Service.
4042
*/
4143
private final QueryService queryService;
42-
private final PlanSerializer planSerializer;
4344

4445
/**
4546
* NO_CONSUMER_RESPONSE_LISTENER should never be called. It is only used as constructor
@@ -65,25 +66,28 @@ public void onFailure(Exception e) {
6566
/**
6667
* Create QueryExecution from Statement.
6768
*/
68-
public AbstractPlan createContinuePaginatedPlan(
69+
public AbstractPlan create(
6970
Statement statement,
7071
Optional<ResponseListener<ExecutionEngine.QueryResponse>> queryListener,
7172
Optional<ResponseListener<ExecutionEngine.ExplainResponse>> explainListener) {
7273
return statement.accept(this, Pair.of(queryListener, explainListener));
7374
}
7475

7576
/**
76-
* Creates a ContinuePaginatedPlan from a cursor.
77+
* Creates a QueryPlan from a cursor.
7778
*/
78-
public AbstractPlan createContinuePaginatedPlan(String cursor, boolean isExplain,
79-
ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener,
80-
ResponseListener<ExecutionEngine.ExplainResponse> explainListener) {
79+
public AbstractPlan create(String cursor, boolean isExplain,
80+
ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener,
81+
ResponseListener<ExecutionEngine.ExplainResponse> explainListener) {
8182
QueryId queryId = QueryId.queryId();
82-
var plan = new ContinuePaginatedPlan(queryId, cursor, queryService,
83-
planSerializer, queryResponseListener);
83+
var plan = new QueryPlan(queryId, new FetchCursor(cursor), queryService, queryResponseListener);
8484
return isExplain ? new ExplainPlan(queryId, plan, explainListener) : plan;
8585
}
8686

87+
boolean canConvertToCursor(UnresolvedPlan plan) {
88+
return plan.accept(new CanPaginateVisitor(), null);
89+
}
90+
8791
@Override
8892
public AbstractPlan visitQuery(
8993
Query node,
@@ -94,7 +98,7 @@ public AbstractPlan visitQuery(
9498
context.getLeft().isPresent(), "[BUG] query listener must be not null");
9599

96100
if (node.getFetchSize() > 0) {
97-
if (planSerializer.canConvertToCursor(node.getPlan())) {
101+
if (canConvertToCursor(node.getPlan())) {
98102
return new QueryPlan(QueryId.queryId(), node.getPlan(), node.getFetchSize(),
99103
queryService,
100104
context.getLeft().get());
@@ -119,7 +123,7 @@ public AbstractPlan visitExplain(
119123

120124
return new ExplainPlan(
121125
QueryId.queryId(),
122-
createContinuePaginatedPlan(node.getStatement(),
126+
create(node.getStatement(),
123127
Optional.of(NO_CONSUMER_RESPONSE_LISTENER), Optional.empty()),
124128
context.getRight().get());
125129
}

core/src/main/java/org/opensearch/sql/executor/pagination/PlanSerializer.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.util.zip.GZIPInputStream;
1919
import java.util.zip.GZIPOutputStream;
2020
import lombok.RequiredArgsConstructor;
21-
import org.opensearch.sql.ast.tree.UnresolvedPlan;
2221
import org.opensearch.sql.exception.NoCursorException;
2322
import org.opensearch.sql.planner.SerializablePlan;
2423
import org.opensearch.sql.planner.physical.PhysicalPlan;
@@ -34,9 +33,6 @@ public class PlanSerializer {
3433

3534
private final StorageEngine engine;
3635

37-
public boolean canConvertToCursor(UnresolvedPlan plan) {
38-
return plan.accept(new CanPaginateVisitor(), null);
39-
}
4036

4137
/**
4238
* Converts a physical plan tree to a cursor.

core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66

77
package org.opensearch.sql.planner;
88

9+
import org.opensearch.sql.executor.pagination.PlanSerializer;
910
import org.opensearch.sql.planner.logical.LogicalAggregation;
1011
import org.opensearch.sql.planner.logical.LogicalDedupe;
1112
import org.opensearch.sql.planner.logical.LogicalEval;
13+
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
1214
import org.opensearch.sql.planner.logical.LogicalFilter;
1315
import org.opensearch.sql.planner.logical.LogicalLimit;
1416
import org.opensearch.sql.planner.logical.LogicalNested;
@@ -148,6 +150,11 @@ public PhysicalPlan visitRelation(LogicalRelation node, C context) {
148150
+ "implementing and optimizing logical plan with relation involved");
149151
}
150152

153+
@Override
154+
public PhysicalPlan visitFetchCursor(LogicalFetchCursor plan, C context) {
155+
return new PlanSerializer(plan.getEngine()).convertToPlan(plan.getCursor());
156+
}
157+
151158
protected PhysicalPlan visitChild(LogicalPlan node, C context) {
152159
// Logical operators visited here must have a single child
153160
return node.getChild().get(0).accept(this, context);

core/src/main/java/org/opensearch/sql/planner/SerializablePlan.java

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,6 @@
66
package org.opensearch.sql.planner;
77

88
import java.io.Externalizable;
9-
import java.io.IOException;
10-
import java.io.ObjectInput;
11-
import java.io.ObjectOutput;
12-
import org.opensearch.sql.executor.pagination.PlanSerializer;
139

1410
/**
1511
* All subtypes of PhysicalPlan which needs to be serialized (in cursor, for pagination feature)
@@ -29,21 +25,6 @@
2925
*/
3026
public interface SerializablePlan extends Externalizable {
3127

32-
/**
33-
* Argument is an instance of {@link PlanSerializer.CursorDeserializationStream}.
34-
*/
35-
@Override
36-
void readExternal(ObjectInput in) throws IOException, ClassNotFoundException;
37-
38-
/**
39-
* Each plan which has as a child plan should do.
40-
* <pre>{@code
41-
* out.writeObject(input.getPlanForSerialization());
42-
* }</pre>
43-
*/
44-
@Override
45-
void writeExternal(ObjectOutput out) throws IOException;
46-
4728
/**
4829
* Override to return child or delegated plan, so parent plan should skip this one
4930
* for serialization, but it should try to serialize grandchild plan.
@@ -55,6 +36,10 @@ public interface SerializablePlan extends Externalizable {
5536
* </pre>
5637
* In that case only plans A and C should be attempted to serialize.
5738
* It is needed to skip a `ResourceMonitorPlan` instance only, actually.
39+
*
40+
* <pre>{@code
41+
* * A.writeObject(B.getPlanForSerialization());
42+
* }</pre>
5843
* @return Next plan for serialization.
5944
*/
6045
default SerializablePlan getPlanForSerialization() {
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.planner.logical;
7+
8+
import java.util.List;
9+
import lombok.EqualsAndHashCode;
10+
import lombok.Getter;
11+
import lombok.ToString;
12+
import org.opensearch.sql.planner.logical.LogicalPlan;
13+
import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor;
14+
import org.opensearch.sql.storage.StorageEngine;
15+
16+
@EqualsAndHashCode(callSuper = false)
17+
@ToString
18+
public class LogicalFetchCursor extends LogicalPlan {
19+
@Getter
20+
private final String cursor;
21+
22+
@Getter
23+
private final StorageEngine engine;
24+
25+
/**
26+
* LogicalCursor constructor. Does not have child plans.
27+
*/
28+
public LogicalFetchCursor(String cursor, StorageEngine engine) {
29+
super(List.of());
30+
this.cursor = cursor;
31+
this.engine = engine;
32+
}
33+
34+
@Override
35+
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
36+
return visitor.visitFetchCursor(this, context);
37+
}
38+
}

0 commit comments

Comments
 (0)