Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
Expand All @@ -19,13 +18,18 @@
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
@RequiredArgsConstructor
Comment thread
ahkcs marked this conversation as resolved.
public class Query extends Statement {

protected final UnresolvedPlan plan;
protected final int fetchSize;
private final QueryType queryType;

public Query(UnresolvedPlan plan, int fetchSize, QueryType queryType) {
this.plan = plan;
this.fetchSize = fetchSize;
this.queryType = queryType;
}
Comment thread
ahkcs marked this conversation as resolved.
Outdated

@Override
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
return visitor.visitQuery(this, context);
Expand Down
26 changes: 14 additions & 12 deletions core/src/main/java/org/opensearch/sql/executor/QueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ public void executeWithCalcite(
QueryProfiling.activate(QueryContext.isProfileEnabled());
ProfileMetric analyzeMetric = profileContext.getOrCreateMetric(MetricName.ANALYZE);
long analyzeStart = System.nanoTime();
SysLimit sysLimit = SysLimit.fromSettings(settings);
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
CalcitePlanContext.create(buildFrameworkConfig(), sysLimit, queryType);
RelNode relNode = analyze(plan, context);
RelNode calcitePlan = convertToCalcitePlan(relNode, context);
analyzeMetric.set(System.nanoTime() - analyzeStart);
Expand Down Expand Up @@ -236,16 +236,18 @@ public void executePlan(
.getSplit()
.ifPresentOrElse(
split -> executionEngine.execute(plan(plan), new ExecutionContext(split), listener),
() ->
executionEngine.execute(
plan(plan),
ExecutionContext.querySizeLimit(
// For pagination, querySizeLimit shouldn't take effect.
// See {@link PaginationWindowIT::testQuerySizeLimitDoesNotEffectPageSize}
plan instanceof LogicalPaginate
? null
: SysLimit.fromSettings(settings).querySizeLimit()),
listener));
() -> {
Integer effectiveLimit;
if (plan instanceof LogicalPaginate) {
// For pagination, querySizeLimit shouldn't take effect.
// See {@link PaginationWindowIT::testQuerySizeLimitDoesNotEffectPageSize}
effectiveLimit = null;
} else {
effectiveLimit = SysLimit.fromSettings(settings).querySizeLimit();
}
executionEngine.execute(
plan(plan), ExecutionContext.querySizeLimit(effectiveLimit), listener);
});
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class QueryPlan extends AbstractPlan {

protected final Optional<Integer> pageSize;

/** Constructor. */
/** Constructor without page size. */
public QueryPlan(
QueryId queryId,
QueryType queryType,
Expand All @@ -43,7 +43,7 @@ public QueryPlan(
this.pageSize = Optional.empty();
}

/** Constructor with page size. */
/** Constructor with page size (for pagination). */
public QueryPlan(
QueryId queryId,
QueryType queryType,
Expand Down
63 changes: 60 additions & 3 deletions docs/user/interfaces/endpoint.rst
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ Explain::
}
}

Cursor
======
Cursor (SQL)
============

Description
-----------

To get paginated response for a query, user needs to provide `fetch_size` parameter as part of normal query. The value of `fetch_size` should be greater than `0`. In absence of `fetch_size` or a value of `0`, it will fallback to non-paginated response. This feature is only available over `jdbc` format for now.
To get paginated response for a SQL query, user needs to provide `fetch_size` parameter as part of normal query. The value of `fetch_size` should be greater than `0`. In absence of `fetch_size` or a value of `0`, it will fallback to non-paginated response. This feature is only available over `jdbc` format for now.

Example
-------
Expand Down Expand Up @@ -266,3 +266,60 @@ Result set::
"size": 5,
"status": 200
}

Fetch Size (PPL)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mark as experimental. We can target prod at 3.7 release.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

================

Description
-----------

PPL also supports the ``fetch_size`` parameter, but with different semantics from SQL. In PPL, ``fetch_size`` limits the number of rows returned in a single, complete response. **PPL does not support cursor-based pagination** — no cursor is returned and there is no way to fetch additional pages. The value of ``fetch_size`` should be between ``1`` and ``10000``. In absence of ``fetch_size`` or a value of ``0``, it will use the system default behavior (no limit).

+--------------------+-------------------------------------+------------------------------------+
| Aspect | SQL ``fetch_size`` | PPL ``fetch_size`` |
+====================+=====================================+====================================+
| Purpose | Cursor-based pagination | Response size limiting |
+--------------------+-------------------------------------+------------------------------------+
| Returns cursor? | Yes | No |
+--------------------+-------------------------------------+------------------------------------+
| Can fetch more? | Yes (with cursor) | No (single response) |
+--------------------+-------------------------------------+------------------------------------+
| Maximum value | No hard limit | 10,000 |
+--------------------+-------------------------------------+------------------------------------+

Example
-------

PPL query::

>> curl -H 'Content-Type: application/json' -X POST localhost:9200/_plugins/_ppl -d '{
"fetch_size" : 5,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if fetch_size larger than plugins.query.size_limit . which one should follow?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since our implementation for PPL fetch_size API is essentially appending a HEAD command at the end of the query, so if fetch_size is larger than plugins.query.size_limit, it would behave the same as using a HEAD command larger than plugins.query.size_limit, which would follow the cap set by plugins.query.size_limit

"query" : "source = accounts | fields firstname, lastname | where age > 20"
}'

Result set::

{
"schema": [
{
"name": "firstname",
"type": "text"
},
{
"name": "lastname",
"type": "text"
}
],
"total": 5,
"datarows": [
["Cherry", "Carey"],
["Lindsey", "Hawkins"],
["Sargent", "Powers"],
["Campos", "Olsen"],
["Savannah", "Kirby"]
],
"size": 5,
"status": 200
}

Note that unlike the SQL response above, there is no ``cursor`` field in the PPL response. The response is complete and final.
2 changes: 1 addition & 1 deletion docs/user/ppl/limitations/limitations.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ For the following functionalities, the query will be forwarded to the V2 query e
* ML
* Kmeans
* `show datasources` and command
* Commands with `fetch_size` parameter
* SQL commands with `fetch_size` parameter (cursor-based pagination). Note: PPL's `fetch_size` (response size limiting, no cursor) is supported in Calcite Engine.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is SQL commands? Rephrase it.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to SQL queries



## Malformed Field Names in Object Fields
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.sql.calcite.remote;

import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ALIAS;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
Expand All @@ -25,8 +26,12 @@

import java.io.IOException;
import java.util.Locale;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.Response;
import org.opensearch.sql.ast.statement.ExplainMode;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.common.setting.Settings.Key;
Expand Down Expand Up @@ -2497,4 +2502,68 @@ public void testExplainMvCombine() throws IOException {
String expected = loadExpectedPlan("explain_mvcombine.yaml");
assertYamlEqualsIgnoreId(expected, actual);
}

// ==================== fetch_size explain tests ====================

@Test
public void testExplainFetchSizePushDown() throws IOException {
// fetch_size=5 injects Head(5, 0) on top of the plan
// Logical plan: LogicalSort(fetch=[5]) wraps the Project
String expected = loadExpectedPlan("explain_fetch_size_push.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryWithFetchSizeYaml(
String.format("source=%s | fields age", TEST_INDEX_ACCOUNT), 5));
}

@Test
public void testExplainFetchSizeWithSmallerHead() throws IOException {
// fetch_size=10 with user's | head 3
// Two LogicalSort nodes: inner fetch=[3] from user head, outer fetch=[10] from fetch_size
// Effective limit = min(3, 10) = 3
String expected = loadExpectedPlan("explain_fetch_size_with_head_push.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryWithFetchSizeYaml(
String.format("source=%s | head 3 | fields age", TEST_INDEX_ACCOUNT), 10));
}

@Test
public void testExplainFetchSizeSmallerThanHead() throws IOException {
// fetch_size=5 with user's | head 100
// Two LogicalSort nodes: inner fetch=[100] from user head, outer fetch=[5] from fetch_size
// Effective limit = min(100, 5) = 5
String expected = loadExpectedPlan("explain_fetch_size_smaller_than_head_push.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryWithFetchSizeYaml(
String.format("source=%s | head 100 | fields age", TEST_INDEX_ACCOUNT), 5));
}

/**
* Send an explain request with fetch_size in the JSON body and return YAML output.
*
* @param query the PPL query string
* @param fetchSize the fetch_size parameter value
* @return the explain output as YAML string
*/
private String explainQueryWithFetchSizeYaml(String query, int fetchSize) throws IOException {
Request request =
new Request(
"POST",
String.format(
"/_plugins/_ppl/_explain?format=%s&mode=%s", Format.YAML, ExplainMode.STANDARD));
String jsonBody =
String.format(
Locale.ROOT, "{\n \"query\": \"%s\",\n \"fetch_size\": %d\n}", query, fetchSize);
request.setJsonEntity(jsonBody);

RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
restOptionsBuilder.addHeader("Content-Type", "application/json");
request.setOptions(restOptionsBuilder);

Response response = client().performRequest(request);
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
return getResponseBody(response, true);
}
}
Loading
Loading