diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 02b97baa93..402de50dfb 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -43,6 +43,7 @@ import org.opensearch.sql.ast.expression.UnresolvedExpression; import org.opensearch.sql.ast.tree.AD; import org.opensearch.sql.ast.tree.Aggregation; +import org.opensearch.sql.ast.tree.CloseCursor; import org.opensearch.sql.ast.tree.Dedupe; import org.opensearch.sql.ast.tree.Eval; import org.opensearch.sql.ast.tree.FetchCursor; @@ -83,6 +84,7 @@ import org.opensearch.sql.expression.parse.ParseExpression; import org.opensearch.sql.planner.logical.LogicalAD; import org.opensearch.sql.planner.logical.LogicalAggregation; +import org.opensearch.sql.planner.logical.LogicalCloseCursor; import org.opensearch.sql.planner.logical.LogicalDedupe; import org.opensearch.sql.planner.logical.LogicalEval; import org.opensearch.sql.planner.logical.LogicalFetchCursor; @@ -572,6 +574,17 @@ public LogicalPlan visitPaginate(Paginate paginate, AnalysisContext context) { return new LogicalPaginate(paginate.getPageSize(), List.of(child)); } + @Override + public LogicalPlan visitFetchCursor(FetchCursor cursor, AnalysisContext context) { + return new LogicalFetchCursor(cursor.getCursor(), + dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine()); + } + + @Override + public LogicalPlan visitCloseCursor(CloseCursor closeCursor, AnalysisContext context) { + return new LogicalCloseCursor(closeCursor.getChild().get(0).accept(this, context)); + } + /** * The first argument is always "asc", others are optional. * Given nullFirst argument, use its value. Otherwise just use DEFAULT_ASC/DESC. @@ -587,10 +600,4 @@ private SortOption analyzeSortOption(List fieldArgs) { } return asc ? SortOption.DEFAULT_ASC : SortOption.DEFAULT_DESC; } - - @Override - public LogicalPlan visitFetchCursor(FetchCursor cursor, AnalysisContext context) { - return new LogicalFetchCursor(cursor.getCursor(), - dataSourceService.getDataSource(DEFAULT_DATASOURCE_NAME).getStorageEngine()); - } } diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index beb4833d4d..3e81509fae 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -41,6 +41,7 @@ import org.opensearch.sql.ast.statement.Statement; import org.opensearch.sql.ast.tree.AD; import org.opensearch.sql.ast.tree.Aggregation; +import org.opensearch.sql.ast.tree.CloseCursor; import org.opensearch.sql.ast.tree.Dedupe; import org.opensearch.sql.ast.tree.Eval; import org.opensearch.sql.ast.tree.FetchCursor; @@ -302,6 +303,10 @@ public T visitPaginate(Paginate paginate, C context) { } public T visitFetchCursor(FetchCursor cursor, C context) { - return visit(cursor, context); + return visitChildren(cursor, context); + } + + public T visitCloseCursor(CloseCursor closeCursor, C context) { + return visitChildren(closeCursor, context); } } diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/CloseCursor.java b/core/src/main/java/org/opensearch/sql/ast/tree/CloseCursor.java new file mode 100644 index 0000000000..cf82c2b070 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/CloseCursor.java @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import java.util.List; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.Node; + +/** + * AST node to represent close cursor operation. + * Actually a wrapper to the AST. + */ +public class CloseCursor extends UnresolvedPlan { + + /** + * An instance of {@link FetchCursor}. + */ + private UnresolvedPlan cursor; + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitCloseCursor(this, context); + } + + @Override + public UnresolvedPlan attach(UnresolvedPlan child) { + this.cursor = child; + return this; + } + + @Override + public List getChild() { + return List.of(cursor); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/CommandPlan.java b/core/src/main/java/org/opensearch/sql/executor/execution/CommandPlan.java new file mode 100644 index 0000000000..0ea5266084 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/execution/CommandPlan.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.execution; + +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryId; +import org.opensearch.sql.executor.QueryService; + +/** + * Query plan which does not reflect a search query being executed. + * It contains a command or an action, for example, a DDL query. + */ +public class CommandPlan extends AbstractPlan { + + /** + * The query plan ast. + */ + protected final UnresolvedPlan plan; + + /** + * Query service. + */ + protected final QueryService queryService; + + protected final ResponseListener listener; + + /** Constructor. */ + public CommandPlan(QueryId queryId, UnresolvedPlan plan, QueryService queryService, + ResponseListener listener) { + super(queryId); + this.plan = plan; + this.queryService = queryService; + this.listener = listener; + } + + @Override + public void execute() { + queryService.execute(plan, listener); + } + + @Override + public void explain(ResponseListener listener) { + throw new UnsupportedOperationException("CommandPlan does not support explain"); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java index df9bc0c734..aeecf3e76f 100644 --- a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java +++ b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java @@ -18,9 +18,7 @@ import org.opensearch.sql.executor.QueryService; /** - * Query plan. Which includes. - * - *

select query. + * Query plan which includes a select query. */ public class QueryPlan extends AbstractPlan { diff --git a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java index cc53f5060b..3273eb3c18 100644 --- a/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java +++ b/core/src/main/java/org/opensearch/sql/executor/execution/QueryPlanFactory.java @@ -17,6 +17,7 @@ import org.opensearch.sql.ast.statement.Explain; import org.opensearch.sql.ast.statement.Query; import org.opensearch.sql.ast.statement.Statement; +import org.opensearch.sql.ast.tree.CloseCursor; import org.opensearch.sql.ast.tree.FetchCursor; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.response.ResponseListener; @@ -88,6 +89,15 @@ boolean canConvertToCursor(UnresolvedPlan plan) { return plan.accept(new CanPaginateVisitor(), null); } + /** + * Creates a {@link CloseCursor} command on a cursor. + */ + public AbstractPlan createCloseCursor(String cursor, + ResponseListener queryResponseListener) { + return new CommandPlan(QueryId.queryId(), new CloseCursor().attach(new FetchCursor(cursor)), + queryService, queryResponseListener); + } + @Override public AbstractPlan visitQuery( Query node, diff --git a/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java b/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java index a1897245ea..af234027e6 100644 --- a/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java +++ b/core/src/main/java/org/opensearch/sql/planner/DefaultImplementor.java @@ -3,11 +3,11 @@ * SPDX-License-Identifier: Apache-2.0 */ - package org.opensearch.sql.planner; import org.opensearch.sql.executor.pagination.PlanSerializer; import org.opensearch.sql.planner.logical.LogicalAggregation; +import org.opensearch.sql.planner.logical.LogicalCloseCursor; import org.opensearch.sql.planner.logical.LogicalDedupe; import org.opensearch.sql.planner.logical.LogicalEval; import org.opensearch.sql.planner.logical.LogicalFetchCursor; @@ -25,6 +25,7 @@ import org.opensearch.sql.planner.logical.LogicalValues; import org.opensearch.sql.planner.logical.LogicalWindow; import org.opensearch.sql.planner.physical.AggregationOperator; +import org.opensearch.sql.planner.physical.CursorCloseOperator; import org.opensearch.sql.planner.physical.DedupeOperator; import org.opensearch.sql.planner.physical.EvalOperator; import org.opensearch.sql.planner.physical.FilterOperator; @@ -155,6 +156,11 @@ public PhysicalPlan visitFetchCursor(LogicalFetchCursor plan, C context) { return new PlanSerializer(plan.getEngine()).convertToPlan(plan.getCursor()); } + @Override + public PhysicalPlan visitCloseCursor(LogicalCloseCursor node, C context) { + return new CursorCloseOperator(visitChild(node, context)); + } + protected PhysicalPlan visitChild(LogicalPlan node, C context) { // Logical operators visited here must have a single child return node.getChild().get(0).accept(this, context); diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalCloseCursor.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalCloseCursor.java new file mode 100644 index 0000000000..e5c30a4f4f --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalCloseCursor.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.logical; + +import java.util.List; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * A logical plan node which wraps {@link org.opensearch.sql.planner.LogicalCursor} + * and represent a cursor close operation. + */ +@ToString +@EqualsAndHashCode(callSuper = false) +public class LogicalCloseCursor extends LogicalPlan { + + public LogicalCloseCursor(LogicalPlan child) { + super(List.of(child)); + } + + @Override + public R accept(LogicalPlanNodeVisitor visitor, C context) { + return visitor.visitCloseCursor(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalFetchCursor.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalFetchCursor.java index d9a426dfe7..e4a0482aac 100644 --- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalFetchCursor.java +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalFetchCursor.java @@ -13,6 +13,9 @@ import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor; import org.opensearch.sql.storage.StorageEngine; +/** + * A plan node which represents operation of fetching a next page from the cursor. + */ @EqualsAndHashCode(callSuper = false) @ToString public class LogicalFetchCursor extends LogicalPlan { diff --git a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java index 796fb50f26..dbe21d38e0 100644 --- a/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitor.java @@ -112,4 +112,8 @@ public R visitPaginate(LogicalPaginate plan, C context) { public R visitFetchCursor(LogicalFetchCursor plan, C context) { return visitNode(plan, context); } + + public R visitCloseCursor(LogicalCloseCursor plan, C context) { + return visitNode(plan, context); + } } diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/CursorCloseOperator.java b/core/src/main/java/org/opensearch/sql/planner/physical/CursorCloseOperator.java new file mode 100644 index 0000000000..13a37fb61e --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/planner/physical/CursorCloseOperator.java @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.physical; + +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.executor.ExecutionEngine; + +/** + * A plan node which blocks issuing a request in {@link #open} and + * getting results in {@link #hasNext}, but doesn't block releasing resources in {@link #close}. + * Designed to be on top of the deserialized tree. + */ +@RequiredArgsConstructor +public class CursorCloseOperator extends PhysicalPlan { + + // Entire deserialized from cursor plan tree + private final PhysicalPlan input; + + @Override + public R accept(PhysicalPlanNodeVisitor visitor, C context) { + return visitor.visitCursorClose(this, context); + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public ExprValue next() { + throw new IllegalStateException(); + } + + @Override + public List getChild() { + return List.of(input); + } + + /** + * Provides an empty schema, because this plan node is always located on the top of the tree. + */ + @Override + public ExecutionEngine.Schema schema() { + return new ExecutionEngine.Schema(List.of()); + } + + // TODO remove + @Override + public long getTotalHits() { + return 0; + } + + @Override + public void open() { + // no-op, no search should be invoked. + } +} diff --git a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java index cb488700a0..1e8f08d39f 100644 --- a/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitor.java @@ -92,4 +92,8 @@ public R visitAD(PhysicalPlan node, C context) { public R visitML(PhysicalPlan node, C context) { return visitNode(node, context); } + + public R visitCursorClose(CursorCloseOperator node, C context) { + return visitNode(node, context); + } } diff --git a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java index dda359a7df..d5a8cd4322 100644 --- a/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java +++ b/core/src/test/java/org/opensearch/sql/analysis/AnalyzerTest.java @@ -7,6 +7,7 @@ package org.opensearch.sql.analysis; import static java.util.Collections.emptyList; +import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -75,6 +76,7 @@ import org.opensearch.sql.ast.expression.ScoreFunction; import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.ast.tree.AD; +import org.opensearch.sql.ast.tree.CloseCursor; import org.opensearch.sql.ast.tree.FetchCursor; import org.opensearch.sql.ast.tree.Kmeans; import org.opensearch.sql.ast.tree.ML; @@ -91,6 +93,7 @@ import org.opensearch.sql.expression.function.OpenSearchFunctions; import org.opensearch.sql.expression.window.WindowDefinition; import org.opensearch.sql.planner.logical.LogicalAD; +import org.opensearch.sql.planner.logical.LogicalCloseCursor; import org.opensearch.sql.planner.logical.LogicalFetchCursor; import org.opensearch.sql.planner.logical.LogicalFilter; import org.opensearch.sql.planner.logical.LogicalMLCommons; @@ -1651,4 +1654,14 @@ void visit_cursor() { assertEquals(new LogicalFetchCursor("test", dataSourceService.getDataSource("@opensearch").getStorageEngine()), actual); } + + @Test + public void visit_close_cursor() { + var analyzed = analyze(new CloseCursor().attach(new FetchCursor("pewpew"))); + assertAll( + () -> assertTrue(analyzed instanceof LogicalCloseCursor), + () -> assertTrue(analyzed.getChild().get(0) instanceof LogicalFetchCursor), + () -> assertEquals("pewpew", ((LogicalFetchCursor) analyzed.getChild().get(0)).getCursor()) + ); + } } diff --git a/core/src/test/java/org/opensearch/sql/executor/execution/CommandPlanTest.java b/core/src/test/java/org/opensearch/sql/executor/execution/CommandPlanTest.java new file mode 100644 index 0000000000..aa300cb0da --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/execution/CommandPlanTest.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.sql.executor.execution; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.withSettings; + +import org.junit.jupiter.api.DisplayNameGeneration; +import org.junit.jupiter.api.DisplayNameGenerator; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.ast.tree.UnresolvedPlan; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.QueryId; +import org.opensearch.sql.executor.QueryService; +import org.opensearch.sql.planner.logical.LogicalPlan; + +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) +public class CommandPlanTest { + + @Test + public void execute_without_error() { + QueryService qs = mock(QueryService.class); + ResponseListener listener = mock(ResponseListener.class); + doNothing().when(qs).execute(any(), any()); + + new CommandPlan(QueryId.queryId(), mock(UnresolvedPlan.class), qs, listener).execute(); + + verify(qs).execute(any(), any()); + verify(listener, never()).onFailure(any()); + } + + @Test + public void execute_with_error() { + QueryService qs = mock(QueryService.class, withSettings().defaultAnswer(CALLS_REAL_METHODS)); + ResponseListener listener = mock(ResponseListener.class); + doThrow(new RuntimeException()) + .when(qs).executePlan(any(LogicalPlan.class), any(), any()); + + new CommandPlan(QueryId.queryId(), mock(UnresolvedPlan.class), qs, listener).execute(); + + verify(listener).onFailure(any()); + } + + @Test + @SuppressWarnings("unchecked") + public void explain_not_supported() { + QueryService qs = mock(QueryService.class); + ResponseListener listener = mock(ResponseListener.class); + ResponseListener explainListener = mock(ResponseListener.class); + + var exception = assertThrows(Throwable.class, () -> + new CommandPlan(QueryId.queryId(), mock(UnresolvedPlan.class), qs, listener) + .explain(explainListener)); + assertEquals("CommandPlan does not support explain", exception.getMessage()); + + verify(listener, never()).onResponse(any()); + verify(listener, never()).onFailure(any()); + verify(explainListener, never()).onResponse(any()); + verify(explainListener, never()).onFailure(any()); + } +} diff --git a/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanFactoryTest.java b/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanFactoryTest.java index c35d506fe7..2d346e4c2a 100644 --- a/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanFactoryTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/execution/QueryPlanFactoryTest.java @@ -13,18 +13,23 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.sql.executor.execution.QueryPlanFactory.NO_CONSUMER_RESPONSE_LISTENER; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayNameGeneration; +import org.junit.jupiter.api.DisplayNameGenerator; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.sql.ast.statement.Explain; import org.opensearch.sql.ast.statement.Query; import org.opensearch.sql.ast.statement.Statement; +import org.opensearch.sql.ast.tree.CloseCursor; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.exception.UnsupportedCursorRequestException; @@ -33,6 +38,7 @@ import org.opensearch.sql.executor.pagination.CanPaginateVisitor; @ExtendWith(MockitoExtension.class) +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) class QueryPlanFactoryTest { @Mock @@ -58,7 +64,7 @@ void init() { } @Test - public void createFromQueryShouldSuccess() { + public void create_from_query_should_success() { Statement query = new Query(plan, 0); AbstractPlan queryExecution = factory.create(query, Optional.of(queryListener), Optional.empty()); @@ -66,7 +72,7 @@ public void createFromQueryShouldSuccess() { } @Test - public void createFromExplainShouldSuccess() { + public void create_from_explain_should_success() { Statement query = new Explain(new Query(plan, 0)); AbstractPlan queryExecution = factory.create(query, Optional.empty(), Optional.of(explainListener)); @@ -74,7 +80,7 @@ public void createFromExplainShouldSuccess() { } @Test - public void createFromCursorShouldSuccess() { + public void create_from_cursor_should_success() { AbstractPlan queryExecution = factory.create("", false, queryListener, explainListener); AbstractPlan explainExecution = factory.create("", true, @@ -86,7 +92,7 @@ public void createFromCursorShouldSuccess() { } @Test - public void createFromQueryWithoutQueryListenerShouldThrowException() { + public void create_from_query_without_query_listener_should_throw_exception() { Statement query = new Query(plan, 0); IllegalArgumentException exception = @@ -96,7 +102,7 @@ public void createFromQueryWithoutQueryListenerShouldThrowException() { } @Test - public void createFromExplainWithoutExplainListenerShouldThrowException() { + public void create_from_explain_without_explain_listener_should_throw_exception() { Statement query = new Explain(new Query(plan, 0)); IllegalArgumentException exception = @@ -106,7 +112,7 @@ public void createFromExplainWithoutExplainListenerShouldThrowException() { } @Test - public void noConsumerResponseChannel() { + public void no_consumer_response_channel() { IllegalStateException exception = assertThrows( IllegalStateException.class, @@ -123,7 +129,7 @@ public void noConsumerResponseChannel() { } @Test - public void createQueryWithFetchSizeWhichCanBePaged() { + public void create_query_with_fetch_size_which_can_be_paged() { when(plan.accept(any(CanPaginateVisitor.class), any())).thenReturn(Boolean.TRUE); factory = new QueryPlanFactory(queryService); Statement query = new Query(plan, 10); @@ -133,7 +139,7 @@ public void createQueryWithFetchSizeWhichCanBePaged() { } @Test - public void createQueryWithFetchSizeWhichCannotBePaged() { + public void create_query_with_fetch_size_which_cannot_be_paged() { when(plan.accept(any(CanPaginateVisitor.class), any())).thenReturn(Boolean.FALSE); factory = new QueryPlanFactory(queryService); Statement query = new Query(plan, 10); @@ -141,4 +147,15 @@ public void createQueryWithFetchSizeWhichCannotBePaged() { () -> factory.create(query, Optional.of(queryListener), Optional.empty())); } + + @Test + public void create_close_cursor() { + factory = new QueryPlanFactory(queryService); + var plan = factory.createCloseCursor("pewpew", queryListener); + assertTrue(plan instanceof CommandPlan); + plan.execute(); + var captor = ArgumentCaptor.forClass(UnresolvedPlan.class); + verify(queryService).execute(captor.capture(), any()); + assertTrue(captor.getValue() instanceof CloseCursor); + } } diff --git a/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java b/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java index d43cb89a3e..c382f2634e 100644 --- a/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/DefaultImplementorTest.java @@ -3,12 +3,15 @@ * SPDX-License-Identifier: Apache-2.0 */ - package org.opensearch.sql.planner; import static java.util.Collections.emptyList; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; import static org.opensearch.sql.data.type.ExprCoreType.STRING; import static org.opensearch.sql.expression.DSL.literal; @@ -40,7 +43,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.sql.ast.tree.RareTopN.CommandType; import org.opensearch.sql.ast.tree.Sort; @@ -55,9 +57,11 @@ import org.opensearch.sql.expression.aggregation.NamedAggregator; import org.opensearch.sql.expression.window.WindowDefinition; import org.opensearch.sql.expression.window.ranking.RowNumberFunction; +import org.opensearch.sql.planner.logical.LogicalCloseCursor; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.logical.LogicalPlanDSL; import org.opensearch.sql.planner.logical.LogicalRelation; +import org.opensearch.sql.planner.physical.CursorCloseOperator; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlanDSL; import org.opensearch.sql.storage.StorageEngine; @@ -226,7 +230,7 @@ public void visitWindowOperator_should_return_PhysicalWindowOperator() { @Test void visitLogicalCursor_deserializes_it() { - var engine = Mockito.mock(StorageEngine.class); + var engine = mock(StorageEngine.class); var physicalPlan = new TestOperator(); var logicalPlan = LogicalPlanDSL.fetchCursor(new PlanSerializer(engine) @@ -236,7 +240,7 @@ void visitLogicalCursor_deserializes_it() { @Test public void visitTableScanBuilder_should_build_TableScanOperator() { - TableScanOperator tableScanOperator = Mockito.mock(TableScanOperator.class); + TableScanOperator tableScanOperator = mock(TableScanOperator.class); TableScanBuilder tableScanBuilder = new TableScanBuilder() { @Override public TableScanOperator build() { @@ -249,7 +253,7 @@ public TableScanOperator build() { @Test public void visitTableWriteBuilder_should_build_TableWriteOperator() { LogicalPlan child = values(); - TableWriteOperator tableWriteOperator = Mockito.mock(TableWriteOperator.class); + TableWriteOperator tableWriteOperator = mock(TableWriteOperator.class); TableWriteBuilder logicalPlan = new TableWriteBuilder(child) { @Override public TableWriteOperator build(PhysicalPlan child) { @@ -258,4 +262,15 @@ public TableWriteOperator build(PhysicalPlan child) { }; assertEquals(tableWriteOperator, logicalPlan.accept(implementor, null)); } + + @Test + public void visitCloseCursor_should_build_CursorCloseOperator() { + var logicalChild = mock(LogicalPlan.class); + var physicalChild = mock(PhysicalPlan.class); + when(logicalChild.accept(implementor, null)).thenReturn(physicalChild); + var logicalPlan = new LogicalCloseCursor(logicalChild); + var implemented = logicalPlan.accept(implementor, null); + assertTrue(implemented instanceof CursorCloseOperator); + assertSame(physicalChild, implemented.getChild().get(0)); + } } diff --git a/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java b/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java index e826a13f6c..d4d5c89c9b 100644 --- a/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/logical/LogicalPlanNodeVisitorTest.java @@ -132,10 +132,13 @@ public TableWriteOperator build(PhysicalPlan child) { LogicalNested nested = new LogicalNested(null, nestedArgs, projectList); LogicalFetchCursor cursor = new LogicalFetchCursor("n:test", mock(StorageEngine.class)); + + LogicalCloseCursor closeCursor = new LogicalCloseCursor(cursor); + return Stream.of( relation, tableScanBuilder, write, tableWriteBuilder, filter, aggregation, rename, project, remove, eval, sort, dedup, window, rareTopN, highlight, mlCommons, ad, ml, paginate, nested, - cursor + cursor, closeCursor ).map(Arguments::of); } diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/CursorCloseOperatorTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/CursorCloseOperatorTest.java new file mode 100644 index 0000000000..66111c1042 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/planner/physical/CursorCloseOperatorTest.java @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.planner.physical; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import org.junit.jupiter.api.DisplayNameGeneration; +import org.junit.jupiter.api.DisplayNameGenerator; +import org.junit.jupiter.api.Test; + +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) +public class CursorCloseOperatorTest { + + @Test + public void never_hasNext() { + var plan = new CursorCloseOperator(null); + assertFalse(plan.hasNext()); + plan.open(); + assertFalse(plan.hasNext()); + } + + // TODO remove + @Test + public void no_total_hits() { + var plan = new CursorCloseOperator(null); + assertEquals(0, plan.getTotalHits()); + plan.open(); + assertEquals(0, plan.getTotalHits()); + } + + @Test + public void open_is_not_propagated() { + var child = mock(PhysicalPlan.class); + var plan = new CursorCloseOperator(child); + plan.open(); + verify(child, never()).open(); + } + + @Test + public void close_is_propagated() { + var child = mock(PhysicalPlan.class); + var plan = new CursorCloseOperator(child); + plan.close(); + verify(child).close(); + } + + @Test + public void next_always_throws() { + var plan = new CursorCloseOperator(null); + assertThrows(Throwable.class, plan::next); + plan.open(); + assertThrows(Throwable.class, plan::next); + } + + @Test + public void produces_empty_schema() { + var child = mock(PhysicalPlan.class); + var plan = new CursorCloseOperator(child); + assertEquals(0, plan.schema().getColumns().size()); + verify(child, never()).schema(); + } +} diff --git a/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java b/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java index fb687277ce..8ed4881d33 100644 --- a/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java +++ b/core/src/test/java/org/opensearch/sql/planner/physical/PhysicalPlanNodeVisitorTest.java @@ -9,9 +9,22 @@ import static java.util.Collections.emptyList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE; import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; import static org.opensearch.sql.expression.DSL.named; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.agg; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.dedupe; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.eval; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.filter; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.limit; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.project; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.rareTopN; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.remove; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.rename; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.sort; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.values; +import static org.opensearch.sql.planner.physical.PhysicalPlanDSL.window; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; @@ -19,9 +32,15 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Stream; import org.apache.commons.lang3.tuple.Pair; +import org.junit.jupiter.api.DisplayNameGeneration; +import org.junit.jupiter.api.DisplayNameGenerator; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.sql.ast.tree.RareTopN.CommandType; @@ -34,6 +53,7 @@ * Todo, testing purpose, delete later. */ @ExtendWith(MockitoExtension.class) +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) class PhysicalPlanNodeVisitorTest extends PhysicalPlanTestBase { @Mock PhysicalPlan plan; @@ -43,13 +63,13 @@ class PhysicalPlanNodeVisitorTest extends PhysicalPlanTestBase { @Test public void print_physical_plan() { PhysicalPlan plan = - PhysicalPlanDSL.remove( - PhysicalPlanDSL.project( - PhysicalPlanDSL.rename( - PhysicalPlanDSL.agg( - PhysicalPlanDSL.rareTopN( - PhysicalPlanDSL.filter( - PhysicalPlanDSL.limit( + remove( + project( + rename( + agg( + rareTopN( + filter( + limit( new TestScan(), 1, 1 ), @@ -76,71 +96,59 @@ public void print_physical_plan() { printer.print(plan)); } - @Test - public void test_PhysicalPlanVisitor_should_return_null() { + public static Stream getPhysicalPlanForTest() { + PhysicalPlan plan = mock(PhysicalPlan.class); + ReferenceExpression ref = mock(ReferenceExpression.class); + PhysicalPlan filter = - PhysicalPlanDSL.filter( - new TestScan(), DSL.equal(DSL.ref("response", INTEGER), DSL.literal(10))); - assertNull(filter.accept(new PhysicalPlanNodeVisitor() { - }, null)); + filter(new TestScan(), DSL.equal(DSL.ref("response", INTEGER), DSL.literal(10))); PhysicalPlan aggregation = - PhysicalPlanDSL.agg( - filter, ImmutableList.of(DSL.named("avg(response)", + agg(filter, ImmutableList.of(DSL.named("avg(response)", DSL.avg(DSL.ref("response", INTEGER)))), ImmutableList.of()); - assertNull(aggregation.accept(new PhysicalPlanNodeVisitor() { - }, null)); PhysicalPlan rename = - PhysicalPlanDSL.rename( - aggregation, ImmutableMap.of(DSL.ref("ivalue", INTEGER), DSL.ref("avg(response)", + rename(aggregation, ImmutableMap.of(DSL.ref("ivalue", INTEGER), DSL.ref("avg(response)", DOUBLE))); - assertNull(rename.accept(new PhysicalPlanNodeVisitor() { - }, null)); - PhysicalPlan project = PhysicalPlanDSL.project(plan, named("ref", ref)); - assertNull(project.accept(new PhysicalPlanNodeVisitor() { - }, null)); + PhysicalPlan project = project(plan, named("ref", ref)); - PhysicalPlan window = PhysicalPlanDSL.window(plan, named(DSL.rowNumber()), + PhysicalPlan window = window(plan, named(DSL.rowNumber()), new WindowDefinition(emptyList(), emptyList())); - assertNull(window.accept(new PhysicalPlanNodeVisitor() { - }, null)); - PhysicalPlan remove = PhysicalPlanDSL.remove(plan, ref); - assertNull(remove.accept(new PhysicalPlanNodeVisitor() { - }, null)); + PhysicalPlan remove = remove(plan, ref); - PhysicalPlan eval = PhysicalPlanDSL.eval(plan, Pair.of(ref, ref)); - assertNull(eval.accept(new PhysicalPlanNodeVisitor() { - }, null)); + PhysicalPlan eval = eval(plan, Pair.of(ref, ref)); - PhysicalPlan sort = PhysicalPlanDSL.sort(plan, Pair.of(SortOption.DEFAULT_ASC, ref)); - assertNull(sort.accept(new PhysicalPlanNodeVisitor() { - }, null)); + PhysicalPlan sort = sort(plan, Pair.of(SortOption.DEFAULT_ASC, ref)); - PhysicalPlan dedupe = PhysicalPlanDSL.dedupe(plan, ref); - assertNull(dedupe.accept(new PhysicalPlanNodeVisitor() { - }, null)); + PhysicalPlan dedupe = dedupe(plan, ref); - PhysicalPlan values = PhysicalPlanDSL.values(emptyList()); - assertNull(values.accept(new PhysicalPlanNodeVisitor() { - }, null)); + PhysicalPlan values = values(emptyList()); - PhysicalPlan rareTopN = - PhysicalPlanDSL.rareTopN(plan, CommandType.TOP, 5, ImmutableList.of(), ref); - assertNull(rareTopN.accept(new PhysicalPlanNodeVisitor() { - }, null)); + PhysicalPlan rareTopN = rareTopN(plan, CommandType.TOP, 5, ImmutableList.of(), ref); - PhysicalPlan limit = PhysicalPlanDSL.limit(plan, 1, 1); - assertNull(limit.accept(new PhysicalPlanNodeVisitor() { - }, null)); + PhysicalPlan limit = limit(plan, 1, 1); Set nestedArgs = Set.of("nested.test"); - Map> groupedFieldsByPath = - Map.of("nested", List.of("nested.test")); + Map> groupedFieldsByPath = Map.of("nested", List.of("nested.test")); PhysicalPlan nested = new NestedOperator(plan, nestedArgs, groupedFieldsByPath); - assertNull(nested.accept(new PhysicalPlanNodeVisitor() { + + PhysicalPlan cursorClose = new CursorCloseOperator(plan); + + return Stream.of(Arguments.of(filter, "filter"), Arguments.of(aggregation, "aggregation"), + Arguments.of(rename, "rename"), Arguments.of(project, "project"), + Arguments.of(window, "window"), Arguments.of(remove, "remove"), + Arguments.of(eval, "eval"), Arguments.of(sort, "sort"), Arguments.of(dedupe, "dedupe"), + Arguments.of(values, "values"), Arguments.of(rareTopN, "rareTopN"), + Arguments.of(limit, "limit"), Arguments.of(nested, "nested"), + Arguments.of(cursorClose, "cursorClose")); + } + + @ParameterizedTest(name = "{1}") + @MethodSource("getPhysicalPlanForTest") + public void test_PhysicalPlanVisitor_should_return_null(PhysicalPlan plan, String name) { + assertNull(plan.accept(new PhysicalPlanNodeVisitor() { }, null)); } diff --git a/docs/dev/Pagination-v2.md b/docs/dev/Pagination-v2.md index 5379153a4a..f80b95fae7 100644 --- a/docs/dev/Pagination-v2.md +++ b/docs/dev/Pagination-v2.md @@ -330,9 +330,9 @@ stateDiagram-v2 AggregationIP --> RelationIP } -state "Subsequent Query Request" As Sub { -FetchCursor -} + state "Subsequent Query Request" As Sub { + FetchCursor + } ``` @@ -500,6 +500,12 @@ Subsequent pages are processed by a new workflow. The key point there: ```mermaid sequenceDiagram + participant SQLService + participant QueryPlanFactory + participant QueryService + participant OpenSearchExecutionEngine + participant DefaultImplementor + participant PlanSerializer SQLService ->>+ QueryPlanFactory : execute QueryPlanFactory ->>+ QueryService : execute @@ -612,13 +618,9 @@ PlanSerializer ->>+ ProjectOperator : serialize ResourceMonitorPlan -->>- ProjectOperator : delegate Note over ResourceMonitorPlan : ResourceMonitorPlan
is not serialized ProjectOperator ->>+ OpenSearchIndexScan : writeExternal - alt First page - OpenSearchIndexScan ->>+ OpenSearchScrollRequest : writeTo - OpenSearchScrollRequest -->>- OpenSearchIndexScan : serialized request - else Subsequent page - OpenSearchIndexScan ->>+ OpenSearchScrollRequest : writeTo - OpenSearchScrollRequest -->>- OpenSearchIndexScan : serialized request - end + OpenSearchIndexScan ->>+ OpenSearchScrollRequest : writeTo + Note over OpenSearchScrollRequest : dump private fields + OpenSearchScrollRequest -->>- OpenSearchIndexScan : serialized request Note over OpenSearchIndexScan : dump private fields OpenSearchIndexScan -->>- ProjectOperator : serialized ProjectOperator -->>- PlanSerializer : serialized @@ -656,6 +658,125 @@ PlanSerializer ->>+ CursorDeserializationStream : deserialize deactivate CursorDeserializationStream ``` +#### Close Cursor + +A user can forcibly close a cursor (scroll) at any moment of paging. Automatic close occurs when paging is complete and no more results left. +Close cursor protocol defined by following: +1. REST endpoint: `/_plugins/_sql/close` +2. Request type: `POST` +3. Request format: +```json +{ + "cursor" : "" +} +``` +4. Response format: +```json +{ + "succeeded": true +} +``` +5. Failure or error: [error response](#error-response) +6. Use or sequential close of already closed cursor produces the same error as use of expired/auto-closed/non-existing cursor. + +```mermaid +sequenceDiagram +SQLService ->>+ QueryPlanFactory : execute + QueryPlanFactory ->>+ QueryService : execute + QueryService ->>+ Analyzer : analyze + Analyzer -->>- QueryService : new LogicalCloseCursor + QueryService ->>+ Planner : plan + Planner ->>+ DefaultImplementor : implement + DefaultImplementor ->>+ PlanSerializer : deserialize + PlanSerializer -->>- DefaultImplementor: physical query plan + DefaultImplementor -->>- Planner : new CloseOperator + Planner -->>- QueryService : CloseOperator + QueryService ->>+ OpenSearchExecutionEngine : execute + Note over OpenSearchExecutionEngine : Open is no-op, no request issued,
no results received and processed + Note over OpenSearchExecutionEngine : Clean-up (clear scroll) on auto-close + OpenSearchExecutionEngine -->>- QueryService: execution completed + QueryService -->>- QueryPlanFactory : execution completed + QueryPlanFactory -->>- SQLService : execution completed +``` + +```mermaid +stateDiagram-v2 + direction LR + state "Abstract Query Plan" as Abstract { + state "CommandPlan" as CommandPlan { + state "Unresolved Query Plan" as Unresolved { + state "CloseCursor" as CloseCursor + state "FetchCursor" as FetchCursor + + CloseCursor --> FetchCursor + } + } + } + state "Logical Query Plan" as Logical { + state "LogicalCloseCursor" as LogicalCloseCursor + state "LogicalFetchCursor" as LogicalFetchCursor + + LogicalCloseCursor --> LogicalFetchCursor + } + state "Optimized Query Plan" as Optimized { + state "LogicalCloseCursor" as LogicalCloseCursorO + state "LogicalFetchCursor" as LogicalFetchCursorO + + LogicalCloseCursorO --> LogicalFetchCursorO + } + state "Physical Query Plan" as Physical { + state "CursorCloseOperator" as CursorCloseOperator + state "ProjectOperator" as ProjectOperator + state "..." as ... + state "OpenSearchIndexScan" as OpenSearchIndexScan + + CursorCloseOperator --> ProjectOperator + ProjectOperator --> ... + ... --> OpenSearchIndexScan + } + + [*] --> Unresolved : QueryPlanner + Unresolved --> Logical : Planner + Logical --> Optimized : Optimizer + Optimized --> Physical : Implementor +``` + +`CursorCloseOperator` provides a dummy (empty, since not used) `Schema`, does not perform `open` and always returns `false` by `hasNext`. Such behavior makes it a no-op operator which blocks underlying Physical Plan Tree from issuing any search request, but does not block auto-close provided by `AutoCloseable`. Default close action clears scroll context. +Regular paging doesn't execute scroll clear, because it checks whether paging is finished or not and raises a flag to prevent clear. This check performed when search response recevied, what never happen due to `CursorCloseOperator`. + +```py +class OpenSearchScrollRequest: + bool needClean = true + + def search: + ... + needClean = response.isEmpty() + + def clean: + if needClean: + clearScroll() +``` + +```py +class CursorCloseOperator(PhysicalPlan): + PhysicalPlan tree + def open: + pass + # no-op, don't propagate `open` of underlying plan tree + + def hasNext: + return false +``` + +```py +class PhysicalPlan: + def open: + innerPlan.open() + + def close: + innerPlan.close() +``` + #### Total Hits Total Hits is the number of rows matching the search criteria; with `select *` queries it is equal to row (doc) number in the table (index). diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/PaginationIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/PaginationIT.java index a1d353cde8..72ec20c679 100644 --- a/integ-test/src/test/java/org/opensearch/sql/sql/PaginationIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/sql/PaginationIT.java @@ -7,11 +7,16 @@ import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_CALCS; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ONLINE; +import static org.opensearch.sql.legacy.plugin.RestSqlAction.EXPLAIN_API_ENDPOINT; import java.io.IOException; + +import lombok.SneakyThrows; import org.json.JSONObject; import org.junit.Ignore; import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; import org.opensearch.client.ResponseException; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.legacy.SQLIntegTestCase; @@ -76,4 +81,35 @@ public void testCursorTimeout() throws IOException, InterruptedException { wipeAllClusterSettings(); } + + @Test + @SneakyThrows + public void testCloseCursor() { + // Initial page request to get cursor + var query = "SELECT * from " + TEST_INDEX_CALCS; + var response = new JSONObject(executeFetchQuery(query, 4, "jdbc")); + assertTrue(response.has("cursor")); + var cursor = response.getString("cursor"); + + // Close the cursor + Request closeCursorRequest = new Request("POST", "_plugins/_sql/close"); + closeCursorRequest.setJsonEntity(String.format("{ \"cursor\" : \"%s\" } ", cursor)); + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + closeCursorRequest.setOptions(restOptionsBuilder); + response = new JSONObject(executeRequest(closeCursorRequest)); + assertTrue(response.has("succeeded")); + assertTrue(response.getBoolean("succeeded")); + + // Test that cursor is no longer available + ResponseException exception = + expectThrows(ResponseException.class, () -> executeCursorQuery(cursor)); + response = new JSONObject(TestUtils.getResponseBody(exception.getResponse())); + assertEquals(response.getJSONObject("error").getString("reason"), + "Error occurred in OpenSearch engine: all shards failed"); + assertTrue(response.getJSONObject("error").getString("details") + .contains("SearchContextMissingException[No search context found for id")); + assertEquals(response.getJSONObject("error").getString("type"), + "SearchPhaseExecutionException"); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/util/StandaloneModule.java b/integ-test/src/test/java/org/opensearch/sql/util/StandaloneModule.java index e38f408514..c347ea5244 100644 --- a/integ-test/src/test/java/org/opensearch/sql/util/StandaloneModule.java +++ b/integ-test/src/test/java/org/opensearch/sql/util/StandaloneModule.java @@ -99,7 +99,7 @@ public SQLService sqlService(QueryManager queryManager, QueryPlanFactory queryPl } @Provides - public PlanSerializer paginatedPlanCache(StorageEngine storageEngine) { + public PlanSerializer planSerializer(StorageEngine storageEngine) { return new PlanSerializer(storageEngine); } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java index c48b18a609..a432c2f473 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSQLQueryAction.java @@ -34,6 +34,7 @@ import org.opensearch.sql.protocol.response.format.Format; import org.opensearch.sql.protocol.response.format.JdbcResponseFormatter; import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; +import org.opensearch.sql.protocol.response.format.CommandResponseFormatter; import org.opensearch.sql.protocol.response.format.RawResponseFormatter; import org.opensearch.sql.protocol.response.format.ResponseFormatter; import org.opensearch.sql.sql.SQLService; @@ -164,7 +165,10 @@ private ResponseListener createQueryResponseListener( BiConsumer errorHandler) { Format format = request.format(); ResponseFormatter formatter; - if (format.equals(Format.CSV)) { + + if (request.isCursorCloseRequest()) { + formatter = new CommandResponseFormatter(); + } else if (format.equals(Format.CSV)) { formatter = new CsvResponseFormatter(request.sanitize()); } else if (format.equals(Format.RAW)) { formatter = new RawResponseFormatter(); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index f63eb9e204..0f32e4a2ee 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -6,7 +6,6 @@ package org.opensearch.sql.opensearch.executor; -import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.List; import java.util.Map; diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java index 9d71cee8c9..dff5545785 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtector.java @@ -12,6 +12,7 @@ import org.opensearch.sql.opensearch.planner.physical.MLCommonsOperator; import org.opensearch.sql.opensearch.planner.physical.MLOperator; import org.opensearch.sql.planner.physical.AggregationOperator; +import org.opensearch.sql.planner.physical.CursorCloseOperator; import org.opensearch.sql.planner.physical.DedupeOperator; import org.opensearch.sql.planner.physical.EvalOperator; import org.opensearch.sql.planner.physical.FilterOperator; @@ -42,6 +43,15 @@ public PhysicalPlan protect(PhysicalPlan physicalPlan) { return physicalPlan.accept(this, null); } + /** + * Don't protect {@link CursorCloseOperator} and entire nested tree, because + * {@link CursorCloseOperator} as designed as no-op. + */ + @Override + public PhysicalPlan visitCursorClose(CursorCloseOperator node, Object context) { + return node; + } + @Override public PhysicalPlan visitFilter(FilterOperator node, Object context) { return new FilterOperator(visitInput(node.getInput(), context), node.getConditions()); diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java index 7173eff171..b63db936af 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequest.java @@ -51,9 +51,10 @@ public class OpenSearchScrollRequest implements OpenSearchRequest { @EqualsAndHashCode.Exclude @ToString.Exclude private final OpenSearchExprValueFactory exprValueFactory; + /** - * Scroll id which is set after first request issued. Because ElasticsearchClient is shared by - * multi-thread so this state has to be maintained here. + * Scroll id which is set after first request issued. Because OpenSearchClient is shared by + * multiple threads so this state has to be maintained here. */ @Setter @Getter @@ -61,7 +62,8 @@ public class OpenSearchScrollRequest implements OpenSearchRequest { public static final String NO_SCROLL_ID = ""; - private boolean needClean = false; + @EqualsAndHashCode.Exclude + private boolean needClean = true; @Getter private final List includes; @@ -158,13 +160,7 @@ public boolean hasAnotherBatch() { public void writeTo(StreamOutput out) throws IOException { initialSearchRequest.writeTo(out); out.writeTimeValue(scrollTimeout); - out.writeBoolean(needClean); - if (!needClean) { - // If needClean is true, there is no more data to get from OpenSearch and scrollId is - // used only to clean up OpenSearch context. - - out.writeString(scrollId); - } + out.writeString(scrollId); out.writeStringCollection(includes); indexName.writeTo(out); } @@ -179,10 +175,7 @@ public OpenSearchScrollRequest(StreamInput in, OpenSearchStorageEngine engine) throws IOException { initialSearchRequest = new SearchRequest(in); scrollTimeout = in.readTimeValue(); - needClean = in.readBoolean(); - if (!needClean) { - scrollId = in.readString(); - } + scrollId = in.readString(); includes = in.readStringList(); indexName = new IndexName(in); OpenSearchIndex index = (OpenSearchIndex) engine.getTable(null, indexName.toString()); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java index 1c978c849e..fd5e747b5f 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java @@ -8,7 +8,10 @@ import static java.util.Collections.emptyList; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.opensearch.sql.ast.tree.Sort.SortOption.DEFAULT_ASC; import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE; import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; @@ -30,6 +33,8 @@ import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayNameGeneration; +import org.junit.jupiter.api.DisplayNameGenerator; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -60,11 +65,13 @@ import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.opensearch.storage.scan.OpenSearchIndexScan; +import org.opensearch.sql.planner.physical.CursorCloseOperator; import org.opensearch.sql.planner.physical.NestedOperator; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlanDSL; @ExtendWith(MockitoExtension.class) +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) class OpenSearchExecutionProtectorTest { @Mock @@ -87,7 +94,7 @@ public void setup() { } @Test - void testProtectIndexScan() { + void test_protect_indexScan() { String indexName = "test"; final int maxResultWindow = 10000; final int querySizeLimit = 200; @@ -174,7 +181,7 @@ void testProtectIndexScan() { @SuppressWarnings("unchecked") @Test - void testProtectSortForWindowOperator() { + void test_protect_sort_for_windowOperator() { NamedExpression rank = named(mock(RankFunction.class)); Pair sortItem = ImmutablePair.of(DEFAULT_ASC, DSL.ref("age", INTEGER)); @@ -200,7 +207,7 @@ void testProtectSortForWindowOperator() { } @Test - void testProtectWindowOperatorInput() { + void test_protect_windowOperator_input() { NamedExpression avg = named(mock(AggregateWindowFunction.class)); WindowDefinition windowDefinition = mock(WindowDefinition.class); @@ -219,7 +226,7 @@ void testProtectWindowOperatorInput() { @SuppressWarnings("unchecked") @Test - void testNotProtectWindowOperatorInputIfAlreadyProtected() { + void test_not_protect_windowOperator_input_if_already_protected() { NamedExpression avg = named(mock(AggregateWindowFunction.class)); Pair sortItem = ImmutablePair.of(DEFAULT_ASC, DSL.ref("age", INTEGER)); @@ -244,7 +251,7 @@ void testNotProtectWindowOperatorInputIfAlreadyProtected() { } @Test - void testWithoutProtection() { + void test_without_protection() { Expression filterExpr = literal(ExprBooleanValue.of(true)); assertEquals( @@ -260,7 +267,7 @@ void testWithoutProtection() { } @Test - void testVisitMlCommons() { + void test_visitMLcommons() { NodeClient nodeClient = mock(NodeClient.class); MLCommonsOperator mlCommonsOperator = new MLCommonsOperator( @@ -278,7 +285,7 @@ void testVisitMlCommons() { } @Test - void testVisitAD() { + void test_visitAD() { NodeClient nodeClient = mock(NodeClient.class); ADOperator adOperator = new ADOperator( @@ -296,7 +303,7 @@ void testVisitAD() { } @Test - void testVisitML() { + void test_visitML() { NodeClient nodeClient = mock(NodeClient.class); MLOperator mlOperator = new MLOperator( @@ -316,7 +323,7 @@ void testVisitML() { } @Test - void testVisitNested() { + void test_visitNested() { Set args = Set.of("message.info"); Map> groupedFieldsByPath = Map.of("message", List.of("message.info")); @@ -330,6 +337,14 @@ void testVisitNested() { executionProtector.visitNested(nestedOperator, values(emptyList()))); } + @Test + void do_nothing_with_CursorCloseOperator_and_children() { + var child = mock(PhysicalPlan.class); + var plan = new CursorCloseOperator(child); + assertSame(plan, executionProtector.protect(plan)); + verify(child, never()).accept(executionProtector, null); + } + PhysicalPlan resourceMonitor(PhysicalPlan input) { return new ResourceMonitorPlan(input, resourceMonitor); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java index a2585620aa..69f38ee7f2 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchScrollRequestTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import lombok.SneakyThrows; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.lucene.search.TotalHits; import org.junit.jupiter.api.DisplayNameGeneration; import org.junit.jupiter.api.DisplayNameGenerator; @@ -188,7 +189,9 @@ void search_withoutIncludes() { } @Test + @SneakyThrows void hasAnotherBatch() { + FieldUtils.writeField(request, "needClean", false, true); request.setScrollId("scroll123"); assertTrue(request.hasAnotherBatch()); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java index e974790629..0f39f635a7 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java @@ -35,11 +35,13 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; +import org.opensearch.action.search.SearchResponse; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.unit.TimeValue; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.SearchHit; +import org.opensearch.search.SearchHits; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder; import org.opensearch.sql.ast.expression.DataType; @@ -114,6 +116,14 @@ void serialize() { var request = new OpenSearchScrollRequest( INDEX_NAME, CURSOR_KEEP_ALIVE, searchSourceBuilder, factory); request.setScrollId("valid-id"); + // make a response, so OpenSearchResponse::isEmpty would return true and unset needClean + var response = mock(SearchResponse.class); + when(response.getAggregations()).thenReturn(mock()); + var hits = mock(SearchHits.class); + when(response.getHits()).thenReturn(hits); + when(response.getScrollId()).thenReturn("valid-id"); + when(hits.getHits()).thenReturn(new SearchHit[]{ mock() }); + request.search(null, (req) -> response); try (var indexScan = new OpenSearchIndexScan(client, QUERY_SIZE, request)) { var planSerializer = new PlanSerializer(engine); @@ -121,7 +131,6 @@ void serialize() { var newPlan = planSerializer.convertToPlan(cursor.toString()); assertEquals(indexScan, newPlan); } - } @Test diff --git a/protocol/src/main/java/org/opensearch/sql/protocol/response/format/CommandResponseFormatter.java b/protocol/src/main/java/org/opensearch/sql/protocol/response/format/CommandResponseFormatter.java new file mode 100644 index 0000000000..68d9be558b --- /dev/null +++ b/protocol/src/main/java/org/opensearch/sql/protocol/response/format/CommandResponseFormatter.java @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.protocol.response.format; + +import lombok.Getter; +import org.opensearch.sql.executor.execution.CommandPlan; +import org.opensearch.sql.opensearch.response.error.ErrorMessage; +import org.opensearch.sql.opensearch.response.error.ErrorMessageFactory; +import org.opensearch.sql.protocol.response.QueryResult; + +/** + * A simple response formatter which contains no data. + * Supposed to use with {@link CommandPlan} only. + */ +public class CommandResponseFormatter extends JsonResponseFormatter { + + public CommandResponseFormatter() { + super(Style.PRETTY); + } + + @Override + protected Object buildJsonObject(QueryResult response) { + return new NoQueryResponse(); + } + + @Override + public String format(Throwable t) { + return new JdbcResponseFormatter(Style.PRETTY).format(t); + } + + @Getter + public static class NoQueryResponse { + // in case of failure an exception is thrown + private final boolean succeeded = true; + } +} diff --git a/protocol/src/test/java/org/opensearch/sql/protocol/response/format/CommandResponseFormatterTest.java b/protocol/src/test/java/org/opensearch/sql/protocol/response/format/CommandResponseFormatterTest.java new file mode 100644 index 0000000000..17bd8aee8d --- /dev/null +++ b/protocol/src/test/java/org/opensearch/sql/protocol/response/format/CommandResponseFormatterTest.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.protocol.response.format; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.opensearch.sql.data.model.ExprValueUtils.tupleValue; +import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; +import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.DisplayNameGeneration; +import org.junit.jupiter.api.DisplayNameGenerator; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.pagination.Cursor; +import org.opensearch.sql.opensearch.data.type.OpenSearchTextType; +import org.opensearch.sql.protocol.response.QueryResult; + +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) +public class CommandResponseFormatterTest { + + @Test + public void produces_always_same_output_for_any_query_response() { + var formatter = new CommandResponseFormatter(); + assertEquals(formatter.format(mock(QueryResult.class)), + formatter.format(mock(QueryResult.class))); + + QueryResult response = new QueryResult( + new ExecutionEngine.Schema(ImmutableList.of( + new ExecutionEngine.Schema.Column("name", "name", STRING), + new ExecutionEngine.Schema.Column("address", "address", OpenSearchTextType.of()), + new ExecutionEngine.Schema.Column("age", "age", INTEGER))), + ImmutableList.of( + tupleValue(ImmutableMap.builder() + .put("name", "John") + .put("address", "Seattle") + .put("age", 20) + .build())), + new Cursor("test_cursor"), 42); + + assertEquals("{\n" + + " \"succeeded\": true\n" + + "}", + formatter.format(response)); + } + + @Test + public void formats_error_as_default_formatter() { + var exception = new Exception("pewpew", new RuntimeException("meow meow")); + assertEquals(new JdbcResponseFormatter(PRETTY).format(exception), + new CommandResponseFormatter().format(exception)); + } +} diff --git a/sql/src/main/java/org/opensearch/sql/sql/SQLService.java b/sql/src/main/java/org/opensearch/sql/sql/SQLService.java index 889f80223f..91ec00cdd5 100644 --- a/sql/src/main/java/org/opensearch/sql/sql/SQLService.java +++ b/sql/src/main/java/org/opensearch/sql/sql/SQLService.java @@ -72,6 +72,10 @@ private AbstractPlan plan( throw new UnsupportedOperationException("Explain of a paged query continuation " + "is not supported. Use `explain` for the initial query request."); } + if (request.isCursorCloseRequest()) { + return queryExecutionFactory.createCloseCursor(request.getCursor().get(), + queryListener.orElse(null)); + } return queryExecutionFactory.create(request.getCursor().get(), isExplainRequest, queryListener.orElse(null), explainListener.orElse(null)); } else { diff --git a/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java b/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java index 7545f4cc19..c9321f5775 100644 --- a/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java +++ b/sql/src/main/java/org/opensearch/sql/sql/domain/SQLQueryRequest.java @@ -112,6 +112,10 @@ public boolean isExplainRequest() { return path.endsWith("/_explain"); } + public boolean isCursorCloseRequest() { + return path.endsWith("/close"); + } + /** * Decide on the formatter by the requested format. */ diff --git a/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java b/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java index f34c95e121..f4342d877d 100644 --- a/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java +++ b/sql/src/test/java/org/opensearch/sql/sql/SQLServiceTest.java @@ -94,6 +94,24 @@ public void onFailure(Exception e) { }); } + @Test + public void can_execute_close_cursor_query() { + sqlService.execute( + new SQLQueryRequest(new JSONObject(), null, QUERY + "/close", + Map.of("format", "jdbc"), "n:cursor"), + new ResponseListener<>() { + @Override + public void onResponse(QueryResponse response) { + assertNotNull(response); + } + + @Override + public void onFailure(Exception e) { + fail(e); + } + }); + } + @Test public void can_execute_csv_format_request() { sqlService.execute( diff --git a/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java b/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java index 62bb665537..1ffa4f0fa8 100644 --- a/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java +++ b/sql/src/test/java/org/opensearch/sql/sql/domain/SQLQueryRequestTest.java @@ -124,6 +124,35 @@ public void should_support_cursor_request() { ); } + @Test + public void should_support_cursor_close_request() { + SQLQueryRequest closeRequest = + SQLQueryRequestBuilder.request(null) + .cursor("pewpew") + .path("_plugins/_sql/close") + .build(); + + SQLQueryRequest emptyCloseRequest = + SQLQueryRequestBuilder.request(null) + .cursor("") + .path("_plugins/_sql/close") + .build(); + + SQLQueryRequest pagingRequest = + SQLQueryRequestBuilder.request(null) + .cursor("pewpew") + .build(); + + assertAll( + () -> assertTrue(closeRequest.isSupported()), + () -> assertTrue(closeRequest.isCursorCloseRequest()), + () -> assertTrue(pagingRequest.isSupported()), + () -> assertFalse(pagingRequest.isCursorCloseRequest()), + () -> assertFalse(emptyCloseRequest.isSupported()), + () -> assertTrue(emptyCloseRequest.isCursorCloseRequest()) + ); + } + @Test public void should_not_support_request_with_empty_cursor() { SQLQueryRequest requestWithEmptyCursor =