Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.sql.ast.expression.UnresolvedExpression;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FetchCursor;
Expand Down Expand Up @@ -83,6 +84,7 @@
import org.opensearch.sql.expression.parse.ParseExpression;
import org.opensearch.sql.planner.logical.LogicalAD;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalCloseCursor;
import org.opensearch.sql.planner.logical.LogicalDedupe;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
Expand Down Expand Up @@ -572,6 +574,17 @@ public LogicalPlan visitPaginate(Paginate paginate, AnalysisContext context) {
return new LogicalPaginate(paginate.getPageSize(), List.of(child));
}

@Override
public LogicalPlan visitFetchCursor(FetchCursor cursor, AnalysisContext context) {
return new LogicalFetchCursor(cursor.getCursor(),
dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine());
}

@Override
public LogicalPlan visitCloseCursor(CloseCursor closeCursor, AnalysisContext context) {
return new LogicalCloseCursor(closeCursor.getChild().get(0).accept(this, context));
}

/**
* The first argument is always "asc", others are optional.
* Given nullFirst argument, use its value. Otherwise just use DEFAULT_ASC/DESC.
Expand All @@ -587,10 +600,4 @@ private SortOption analyzeSortOption(List<Argument> fieldArgs) {
}
return asc ? SortOption.DEFAULT_ASC : SortOption.DEFAULT_DESC;
}

@Override
public LogicalPlan visitFetchCursor(FetchCursor cursor, AnalysisContext context) {
return new LogicalFetchCursor(cursor.getCursor(),
dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
import org.opensearch.sql.ast.tree.Eval;
import org.opensearch.sql.ast.tree.FetchCursor;
Expand Down Expand Up @@ -302,6 +303,10 @@ public T visitPaginate(Paginate paginate, C context) {
}

public T visitFetchCursor(FetchCursor cursor, C context) {
return visit(cursor, context);
return visitChildren(cursor, context);
}

public T visitCloseCursor(CloseCursor closeCursor, C context) {
return visitChildren(closeCursor, context);
}
}
38 changes: 38 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/CloseCursor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import java.util.List;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.Node;

/**
* AST node to represent close cursor operation.
* Actually a wrapper to the AST.
*/
public class CloseCursor extends UnresolvedPlan {

/**
* An instance of {@link FetchCursor}.
*/
private UnresolvedPlan cursor;

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitCloseCursor(this, context);
}

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
this.cursor = child;
return this;
}

@Override
public List<? extends Node> getChild() {
return List.of(cursor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sql.executor.execution;

import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.executor.QueryId;
import org.opensearch.sql.executor.QueryService;

/**
* Query plan which does not reflect a search query being executed.
* It contains a command or an action, for example, a DDL query.
*/
public class CommandPlan extends AbstractPlan {

/**
* The query plan ast.
*/
protected final UnresolvedPlan plan;

/**
* Query service.
*/
protected final QueryService queryService;

protected final ResponseListener<ExecutionEngine.QueryResponse> listener;

/** Constructor. */
public CommandPlan(QueryId queryId, UnresolvedPlan plan, QueryService queryService,
ResponseListener<ExecutionEngine.QueryResponse> listener) {
super(queryId);
this.plan = plan;
this.queryService = queryService;
this.listener = listener;
}

@Override
public void execute() {
queryService.execute(plan, listener);
}

@Override
public void explain(ResponseListener<ExecutionEngine.ExplainResponse> listener) {
throw new UnsupportedOperationException("CommandPlan does not support explain");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import org.opensearch.sql.executor.QueryService;

/**
* Query plan. Which includes.
*
* <p>select query.
* Query plan which includes a <em>select</em> query.
*/
public class QueryPlan extends AbstractPlan {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.sql.ast.statement.Explain;
import org.opensearch.sql.ast.statement.Query;
import org.opensearch.sql.ast.statement.Statement;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.common.response.ResponseListener;
Expand Down Expand Up @@ -88,6 +89,15 @@ boolean canConvertToCursor(UnresolvedPlan plan) {
return plan.accept(new CanPaginateVisitor(), null);
}

/**
* Creates a {@link CloseCursor} command on a cursor.
*/
public AbstractPlan createCloseCursor(String cursor,
ResponseListener<ExecutionEngine.QueryResponse> queryResponseListener) {
return new CommandPlan(QueryId.queryId(), new CloseCursor().attach(new FetchCursor(cursor)),
queryService, queryResponseListener);
}

@Override
public AbstractPlan visitQuery(
Query node,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/


package org.opensearch.sql.planner;

import org.opensearch.sql.executor.pagination.PlanSerializer;
import org.opensearch.sql.planner.logical.LogicalAggregation;
import org.opensearch.sql.planner.logical.LogicalCloseCursor;
import org.opensearch.sql.planner.logical.LogicalDedupe;
import org.opensearch.sql.planner.logical.LogicalEval;
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
Expand All @@ -25,6 +25,7 @@
import org.opensearch.sql.planner.logical.LogicalValues;
import org.opensearch.sql.planner.logical.LogicalWindow;
import org.opensearch.sql.planner.physical.AggregationOperator;
import org.opensearch.sql.planner.physical.CursorCloseOperator;
import org.opensearch.sql.planner.physical.DedupeOperator;
import org.opensearch.sql.planner.physical.EvalOperator;
import org.opensearch.sql.planner.physical.FilterOperator;
Expand Down Expand Up @@ -155,6 +156,11 @@ public PhysicalPlan visitFetchCursor(LogicalFetchCursor plan, C context) {
return new PlanSerializer(plan.getEngine()).convertToPlan(plan.getCursor());
}

@Override
public PhysicalPlan visitCloseCursor(LogicalCloseCursor node, C context) {
return new CursorCloseOperator(visitChild(node, context));
}

protected PhysicalPlan visitChild(LogicalPlan node, C context) {
// Logical operators visited here must have a single child
return node.getChild().get(0).accept(this, context);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.logical;

import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* A logical plan node which wraps {@link org.opensearch.sql.planner.LogicalCursor}
* and represent a cursor close operation.
*/
@ToString
@EqualsAndHashCode(callSuper = false)
public class LogicalCloseCursor extends LogicalPlan {

public LogicalCloseCursor(LogicalPlan child) {
super(List.of(child));
}

@Override
public <R, C> R accept(LogicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitCloseCursor(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor;
import org.opensearch.sql.storage.StorageEngine;

/**
* A plan node which represents operation of fetching a next page from the cursor.
*/
@EqualsAndHashCode(callSuper = false)
@ToString
public class LogicalFetchCursor extends LogicalPlan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,8 @@ public R visitPaginate(LogicalPaginate plan, C context) {
public R visitFetchCursor(LogicalFetchCursor plan, C context) {
return visitNode(plan, context);
}

public R visitCloseCursor(LogicalCloseCursor plan, C context) {
return visitNode(plan, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.planner.physical;

import java.util.List;
import lombok.RequiredArgsConstructor;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.executor.ExecutionEngine;

/**
* A plan node which blocks issuing a request in {@link #open} and
* getting results in {@link #hasNext}, but doesn't block releasing resources in {@link #close}.
* Designed to be on top of the deserialized tree.
*/
@RequiredArgsConstructor
public class CursorCloseOperator extends PhysicalPlan {

// Entire deserialized from cursor plan tree
private final PhysicalPlan input;

@Override
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) {
return visitor.visitCursorClose(this, context);
}

@Override
public boolean hasNext() {
return false;
}

@Override
public ExprValue next() {
throw new IllegalStateException();
}

@Override
public List<PhysicalPlan> getChild() {
return List.of(input);
}

/**
* Provides an empty schema, because this plan node is always located on the top of the tree.
*/
@Override
public ExecutionEngine.Schema schema() {
return new ExecutionEngine.Schema(List.of());
}

// TODO remove
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Override
public long getTotalHits() {
return 0;
}

@Override
public void open() {
// no-op, no search should be invoked.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,8 @@ public R visitAD(PhysicalPlan node, C context) {
public R visitML(PhysicalPlan node, C context) {
return visitNode(node, context);
}

public R visitCursorClose(CursorCloseOperator node, C context) {
return visitNode(node, context);
}
}
13 changes: 13 additions & 0 deletions core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package org.opensearch.sql.analysis;

import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -75,6 +76,7 @@
import org.opensearch.sql.ast.expression.ScoreFunction;
import org.opensearch.sql.ast.expression.SpanUnit;
import org.opensearch.sql.ast.tree.AD;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.FetchCursor;
import org.opensearch.sql.ast.tree.Kmeans;
import org.opensearch.sql.ast.tree.ML;
Expand All @@ -91,6 +93,7 @@
import org.opensearch.sql.expression.function.OpenSearchFunctions;
import org.opensearch.sql.expression.window.WindowDefinition;
import org.opensearch.sql.planner.logical.LogicalAD;
import org.opensearch.sql.planner.logical.LogicalCloseCursor;
import org.opensearch.sql.planner.logical.LogicalFetchCursor;
import org.opensearch.sql.planner.logical.LogicalFilter;
import org.opensearch.sql.planner.logical.LogicalMLCommons;
Expand Down Expand Up @@ -1651,4 +1654,14 @@ void visit_cursor() {
assertEquals(new LogicalFetchCursor("test",
dataSourceService.getDataSource("@opensearch").getStorageEngine()), actual);
}

@Test
public void visit_close_cursor() {
var analyzed = analyze(new CloseCursor().attach(new FetchCursor("pewpew")));
assertAll(
() -> assertTrue(analyzed instanceof LogicalCloseCursor),
() -> assertTrue(analyzed.getChild().get(0) instanceof LogicalFetchCursor),
() -> assertEquals("pewpew", ((LogicalFetchCursor) analyzed.getChild().get(0)).getCursor())
);
}
}
Loading