diff --git a/docs/user/interfaces/endpoint.rst b/docs/user/interfaces/endpoint.rst index 9143ba62dac..26dca94d228 100644 --- a/docs/user/interfaces/endpoint.rst +++ b/docs/user/interfaces/endpoint.rst @@ -208,13 +208,13 @@ Explain:: } } -Cursor -====== +Cursor (SQL) +============ Description ----------- -To get paginated response for a query, user needs to provide `fetch_size` parameter as part of normal query. The value of `fetch_size` should be greater than `0`. In absence of `fetch_size` or a value of `0`, it will fallback to non-paginated response. This feature is only available over `jdbc` format for now. +To get paginated response for a SQL query, user needs to provide `fetch_size` parameter as part of normal query. The value of `fetch_size` should be greater than `0`. In absence of `fetch_size` or a value of `0`, it will fallback to non-paginated response. This feature is only available over `jdbc` format for now. Example ------- @@ -266,3 +266,74 @@ Result set:: "size": 5, "status": 200 } + +Fetch Size (PPL) [Experimental] +================================ + +Description +----------- + +The ``fetch_size`` parameter limits the number of rows returned in a PPL query response. The value of ``fetch_size`` should be greater than ``0``. In absence of ``fetch_size`` or a value of ``0``, the result size is governed by the ``plugins.query.size_limit`` cluster setting. + +``fetch_size`` can be specified either as a URL parameter or in the JSON request body. If both are provided, the JSON body value takes precedence. + +If ``fetch_size`` is larger than ``plugins.query.size_limit``, the result is capped at ``plugins.query.size_limit``. The effective number of rows returned is always ``min(fetch_size, plugins.query.size_limit)``. + +Note +---- + +Unlike SQL's ``fetch_size`` which enables cursor-based pagination, PPL's ``fetch_size`` does not return a cursor and does not support fetching additional pages. The response is always complete and final. + ++--------------------+-------------------------------------+------------------------------------+ +| Aspect | SQL ``fetch_size`` | PPL ``fetch_size`` | ++====================+=====================================+====================================+ +| Purpose | Cursor-based pagination | Response size limiting | ++--------------------+-------------------------------------+------------------------------------+ +| Returns cursor? | Yes | No | ++--------------------+-------------------------------------+------------------------------------+ +| Can fetch more? | Yes (with cursor) | No (single response) | ++--------------------+-------------------------------------+------------------------------------+ + +Example 1: JSON body +------- + +PPL query:: + + >> curl -H 'Content-Type: application/json' -X POST localhost:9200/_plugins/_ppl -d '{ + "fetch_size" : 5, + "query" : "source = accounts | fields firstname, lastname | where age > 20" + }' + +Example 2: URL parameter +------- + +PPL query:: + + >> curl -H 'Content-Type: application/json' -X POST localhost:9200/_plugins/_ppl?fetch_size=5 -d '{ + "query" : "source = accounts | fields firstname, lastname | where age > 20" + }' + +Result set:: + + { + "schema": [ + { + "name": "firstname", + "type": "text" + }, + { + "name": "lastname", + "type": "text" + } + ], + "total": 5, + "datarows": [ + ["Cherry", "Carey"], + ["Lindsey", "Hawkins"], + ["Sargent", "Powers"], + ["Campos", "Olsen"], + ["Savannah", "Kirby"] + ], + "size": 5, + "status": 200 + } diff --git a/docs/user/ppl/limitations/limitations.md b/docs/user/ppl/limitations/limitations.md index ac7494386dd..e532f64a790 100644 --- a/docs/user/ppl/limitations/limitations.md +++ b/docs/user/ppl/limitations/limitations.md @@ -62,7 +62,7 @@ For the following functionalities, the query will be forwarded to the V2 query e * ML * Kmeans * `show datasources` and command -* Commands with `fetch_size` parameter +* SQL queries with `fetch_size` parameter (cursor-based pagination). Note: PPL's `fetch_size` (response size limiting, no cursor) is supported in Calcite Engine. ## Malformed Field Names in Object Fields diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 4846480fc1d..b233dd35cc6 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -5,6 +5,7 @@ package org.opensearch.sql.calcite.remote; +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ALIAS; import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; @@ -25,8 +26,12 @@ import java.io.IOException; import java.util.Locale; +import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; import org.opensearch.sql.ast.statement.ExplainMode; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.common.setting.Settings.Key; @@ -2497,4 +2502,68 @@ public void testExplainMvCombine() throws IOException { String expected = loadExpectedPlan("explain_mvcombine.yaml"); assertYamlEqualsIgnoreId(expected, actual); } + + // ==================== fetch_size explain tests ==================== + + @Test + public void testExplainFetchSizePushDown() throws IOException { + // fetch_size=5 injects Head(5, 0) on top of the plan + // Logical plan: LogicalSort(fetch=[5]) wraps the Project + String expected = loadExpectedPlan("explain_fetch_size_push.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryWithFetchSizeYaml( + String.format("source=%s | fields age", TEST_INDEX_ACCOUNT), 5)); + } + + @Test + public void testExplainFetchSizeWithSmallerHead() throws IOException { + // fetch_size=10 with user's | head 3 + // Two LogicalSort nodes: inner fetch=[3] from user head, outer fetch=[10] from fetch_size + // Effective limit = min(3, 10) = 3 + String expected = loadExpectedPlan("explain_fetch_size_with_head_push.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryWithFetchSizeYaml( + String.format("source=%s | head 3 | fields age", TEST_INDEX_ACCOUNT), 10)); + } + + @Test + public void testExplainFetchSizeSmallerThanHead() throws IOException { + // fetch_size=5 with user's | head 100 + // Two LogicalSort nodes: inner fetch=[100] from user head, outer fetch=[5] from fetch_size + // Effective limit = min(100, 5) = 5 + String expected = loadExpectedPlan("explain_fetch_size_smaller_than_head_push.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryWithFetchSizeYaml( + String.format("source=%s | head 100 | fields age", TEST_INDEX_ACCOUNT), 5)); + } + + /** + * Send an explain request with fetch_size in the JSON body and return YAML output. + * + * @param query the PPL query string + * @param fetchSize the fetch_size parameter value + * @return the explain output as YAML string + */ + private String explainQueryWithFetchSizeYaml(String query, int fetchSize) throws IOException { + Request request = + new Request( + "POST", + String.format( + "/_plugins/_ppl/_explain?format=%s&mode=%s", Format.YAML, ExplainMode.STANDARD)); + String jsonBody = + String.format( + Locale.ROOT, "{\n \"query\": \"%s\",\n \"fetch_size\": %d\n}", query, fetchSize); + request.setJsonEntity(jsonBody); + + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + + Response response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + return getResponseBody(response, true); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java new file mode 100644 index 00000000000..abd75536d55 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/FetchSizeIT.java @@ -0,0 +1,262 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl; + +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; +import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.QUERY_API_ENDPOINT; + +import java.io.IOException; +import java.util.Locale; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Assert; +import org.junit.jupiter.api.Test; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; + +/** Integration tests for PPL fetch_size parameter. */ +public class FetchSizeIT extends PPLIntegTestCase { + + @Override + public void init() throws Exception { + super.init(); + loadIndex(Index.ACCOUNT); + loadIndex(Index.BANK); + } + + @Test + public void testFetchSizeLimitsResults() throws IOException { + // accounts index has 1000 documents, request only 5 + JSONObject result = executeQueryWithFetchSize("source=" + TEST_INDEX_ACCOUNT, 5); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(5, dataRows.length()); + } + + @Test + public void testFetchSizeWithFields() throws IOException { + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | fields firstname, age", TEST_INDEX_ACCOUNT), 3); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(3, dataRows.length()); + } + + @Test + public void testFetchSizeWithFilter() throws IOException { + // Filter + fetch_size + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | where age > 30", TEST_INDEX_ACCOUNT), 10); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(10, dataRows.length()); + } + + @Test + public void testFetchSizeWithSort() throws IOException { + // Sort + fetch_size - should get the first N results after sorting + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | sort age | fields firstname, age", TEST_INDEX_ACCOUNT), 5); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(5, dataRows.length()); + } + + @Test + public void testFetchSizeWithEval() throws IOException { + // Eval command + fetch_size - ensures fetch_size works with post-processing commands + JSONObject result = + executeQueryWithFetchSize( + String.format( + "source=%s | eval age_plus_10 = age + 10 | fields firstname, age, age_plus_10", + TEST_INDEX_ACCOUNT), + 7); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(7, dataRows.length()); + } + + @Test + public void testFetchSizeWithDedup() throws IOException { + // Dedup command + fetch_size - dedup may return fewer than fetch_size if not enough unique + // values + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | dedup gender | fields gender", TEST_INDEX_ACCOUNT), 100); + JSONArray dataRows = result.getJSONArray("datarows"); + // There are only 2 genders (M, F) in the dataset, so we should get at most 2 + assertTrue(dataRows.length() <= 2); + } + + @Test + public void testFetchSizeWithRename() throws IOException { + JSONObject result = + executeQueryWithFetchSize( + String.format( + "source=%s | rename firstname as first_name | fields first_name, age", + TEST_INDEX_ACCOUNT), + 4); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(4, dataRows.length()); + } + + @Test + public void testFetchSizeZeroReturnsAllResults() throws IOException { + // fetch_size=0 should be treated as "no limit" (use system default) + JSONObject result = executeQueryWithFetchSize(String.format("source=%s", TEST_INDEX_BANK), 0); + JSONArray dataRows = result.getJSONArray("datarows"); + // Bank index has 7 documents + assertEquals(7, dataRows.length()); + } + + @Test + public void testFetchSizeLargerThanDataset() throws IOException { + // When fetch_size is larger than the dataset, return all available results + JSONObject result = + executeQueryWithFetchSize(String.format("source=%s", TEST_INDEX_BANK), 1000); + JSONArray dataRows = result.getJSONArray("datarows"); + // Bank index has 7 documents, so we should get 7, not 1000 + assertEquals(7, dataRows.length()); + } + + @Test + public void testFetchSizeAtSystemLimit() throws IOException { + // fetch_size at the default system limit (10000) should work without error + JSONObject result = + executeQueryWithFetchSize(String.format("source=%s", TEST_INDEX_BANK), 10000); + JSONArray dataRows = result.getJSONArray("datarows"); + // Bank index has 7 documents, so we get all of them + assertEquals(7, dataRows.length()); + } + + @Test + public void testFetchSizeExceedingSystemLimitIsCapped() throws IOException { + // fetch_size > system limit (10000) is accepted but capped by LogicalSystemLimit + JSONObject result = + executeQueryWithFetchSize(String.format("source=%s", TEST_INDEX_BANK), 10001); + JSONArray dataRows = result.getJSONArray("datarows"); + // Bank index has 7 documents, result is capped by system limit but dataset is smaller + assertEquals(7, dataRows.length()); + } + + @Test + public void testNegativeFetchSizeReturnsAllResults() throws IOException { + // Negative fetch_size is treated as "no limit" (same as 0) + JSONObject result = executeQueryWithFetchSize(String.format("source=%s", TEST_INDEX_BANK), -1); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(7, dataRows.length()); + } + + @Test + public void testFetchSizeOne() throws IOException { + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | fields firstname", TEST_INDEX_ACCOUNT), 1); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(1, dataRows.length()); + } + + @Test + public void testFetchSizeWithStats() throws IOException { + // Stats aggregation - fetch_size should still apply to aggregation results + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | stats count() by gender", TEST_INDEX_ACCOUNT), 100); + JSONArray dataRows = result.getJSONArray("datarows"); + // Stats by gender should return 2 rows (M and F) + assertEquals(2, dataRows.length()); + } + + @Test + public void testFetchSizeWithHead() throws IOException { + // Both head command and fetch_size - the smaller limit should win + // head 3 limits to 3, fetch_size 10 would allow 10, so we get 3 + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | head 3 | fields firstname", TEST_INDEX_ACCOUNT), 10); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(3, dataRows.length()); + } + + @Test + public void testFetchSizeSmallerThanHead() throws IOException { + // fetch_size smaller than head - fetch_size should further limit + // head 100 would return 100, but fetch_size 5 limits to 5 + JSONObject result = + executeQueryWithFetchSize( + String.format("source=%s | head 100 | fields firstname", TEST_INDEX_ACCOUNT), 5); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(5, dataRows.length()); + } + + @Test + public void testFetchSizeAsUrlParameter() throws IOException { + // fetch_size specified as URL parameter instead of JSON body + Request request = new Request("POST", QUERY_API_ENDPOINT + "?fetch_size=5"); + String jsonBody = + String.format( + Locale.ROOT, "{\n \"query\": \"source=%s | fields firstname\"\n}", TEST_INDEX_ACCOUNT); + request.setJsonEntity(jsonBody); + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + Response response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + JSONObject result = jsonify(getResponseBody(response, true)); + assertEquals(5, result.getJSONArray("datarows").length()); + } + + @Test + public void testFetchSizeJsonBodyTakesPrecedenceOverUrlParam() throws IOException { + // JSON body fetch_size=3 should take precedence over URL param fetch_size=10 + Request request = new Request("POST", QUERY_API_ENDPOINT + "?fetch_size=10"); + String jsonBody = + String.format( + Locale.ROOT, + "{\n \"query\": \"source=%s | fields firstname\",\n \"fetch_size\": 3\n}", + TEST_INDEX_ACCOUNT); + request.setJsonEntity(jsonBody); + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + Response response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + JSONObject result = jsonify(getResponseBody(response, true)); + assertEquals(3, result.getJSONArray("datarows").length()); + } + + @Test + public void testWithoutFetchSizeReturnsDefaultBehavior() throws IOException { + // Without fetch_size, should return results up to system default + JSONObject result = executeQuery(String.format("source=%s", TEST_INDEX_BANK)); + JSONArray dataRows = result.getJSONArray("datarows"); + assertEquals(7, dataRows.length()); + } + + /** + * Execute a PPL query with fetch_size parameter. + * + * @param query the PPL query string + * @param fetchSize the maximum number of results to return + * @return the JSON response + */ + protected JSONObject executeQueryWithFetchSize(String query, int fetchSize) throws IOException { + Request request = new Request("POST", QUERY_API_ENDPOINT); + String jsonBody = + String.format( + Locale.ROOT, "{\n \"query\": \"%s\",\n \"fetch_size\": %d\n}", query, fetchSize); + request.setJsonEntity(jsonBody); + + RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder(); + restOptionsBuilder.addHeader("Content-Type", "application/json"); + request.setOptions(restOptionsBuilder); + + Response response = client().performRequest(request); + Assert.assertEquals(200, response.getStatusLine().getStatusCode()); + return jsonify(getResponseBody(response, true)); + } +} diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_push.yaml new file mode 100644 index 00000000000..d35c47cf5cb --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_push.yaml @@ -0,0 +1,8 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(fetch=[5]) + LogicalProject(age=[$8]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_smaller_than_head_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_smaller_than_head_push.yaml new file mode 100644 index 00000000000..7c80ddf56df --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_smaller_than_head_push.yaml @@ -0,0 +1,9 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(fetch=[5]) + LogicalProject(age=[$8]) + LogicalSort(fetch=[100]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->100, LIMIT->5, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_with_head_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_with_head_push.yaml new file mode 100644 index 00000000000..ba828e445b5 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_fetch_size_with_head_push.yaml @@ -0,0 +1,9 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(fetch=[10]) + LogicalProject(age=[$8]) + LogicalSort(fetch=[3]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->3, LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":3,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=3, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_push.yaml new file mode 100644 index 00000000000..ca4931ec61a --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_push.yaml @@ -0,0 +1,11 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(fetch=[5]) + LogicalProject(age=[$8]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..16=[{inputs}], age=[$t8]) + EnumerableLimit(fetch=[5]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_smaller_than_head_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_smaller_than_head_push.yaml new file mode 100644 index 00000000000..a3099df5ff1 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_smaller_than_head_push.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(fetch=[5]) + LogicalProject(age=[$8]) + LogicalSort(fetch=[100]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..16=[{inputs}], age=[$t8]) + EnumerableLimit(fetch=[5]) + EnumerableLimit(fetch=[100]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_with_head_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_with_head_push.yaml new file mode 100644 index 00000000000..6fb5ef8a97c --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_fetch_size_with_head_push.yaml @@ -0,0 +1,13 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalSort(fetch=[10]) + LogicalProject(age=[$8]) + LogicalSort(fetch=[3]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableLimit(fetch=[10]) + EnumerableCalc(expr#0..16=[{inputs}], age=[$t8]) + EnumerableLimit(fetch=[3]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java b/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java index 0dd8e1a9651..0d07dab966a 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/request/PPLQueryRequestFactory.java @@ -30,6 +30,7 @@ public class PPLQueryRequestFactory { private static final String DEFAULT_EXPLAIN_MODE = "standard"; private static final String QUERY_PARAMS_PRETTY = "pretty"; private static final String QUERY_PARAMS_PROFILE = "profile"; + private static final String QUERY_PARAMS_FETCH_SIZE = "fetch_size"; /** * Build {@link PPLQueryRequest} from {@link RestRequest}. @@ -84,6 +85,18 @@ private static PPLQueryRequest parsePPLRequestFromPayload(RestRequest restReques String queryString = jsonContent.optString(PPL_FIELD_NAME, ""); boolean enableProfile = profileRequested && isProfileSupported(restRequest.path(), format, queryString); + // Support fetch_size as a URL parameter if not already in the JSON body + if (!jsonContent.has(QUERY_PARAMS_FETCH_SIZE) + && restRequest.params().containsKey(QUERY_PARAMS_FETCH_SIZE)) { + try { + jsonContent.put( + QUERY_PARAMS_FETCH_SIZE, + Integer.parseInt(restRequest.params().get(QUERY_PARAMS_FETCH_SIZE))); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Invalid fetch_size parameter: must be a valid integer", e); + } + } PPLQueryRequest pplRequest = new PPLQueryRequest( jsonContent.getString(PPL_FIELD_NAME), diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java index 54efb861cf5..ffdd90504f7 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java @@ -77,7 +77,7 @@ public String getName() { @Override protected Set responseParams() { Set responseParams = new HashSet<>(super.responseParams()); - responseParams.addAll(Arrays.asList("format", "mode", "sanitize")); + responseParams.addAll(Arrays.asList("format", "mode", "sanitize", "fetch_size")); return responseParams; } diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java index ecae97283ed..d6f025a4540 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/PPLService.java @@ -97,6 +97,7 @@ private AbstractPlan plan( new AstBuilder(request.getRequest(), settings), AstStatementBuilder.StatementBuilderContext.builder() .isExplain(request.isExplainRequest()) + .fetchSize(request.getFetchSize()) .format(request.getFormat()) .explainMode(request.getExplainMode()) .build())); diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java index ef21e0f2803..caf666b3b4e 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java @@ -18,6 +18,7 @@ public class PPLQueryRequest { private static final String DEFAULT_PPL_PATH = "/_plugins/_ppl"; + private static final String FETCH_SIZE_FIELD = "fetch_size"; public static final PPLQueryRequest NULL = new PPLQueryRequest("", null, DEFAULT_PPL_PATH, ""); @@ -93,4 +94,19 @@ public Format format() { public ExplainMode mode() { return ExplainMode.of(explainMode); } + + /** + * Get the maximum number of results to return. Unlike SQL's fetch_size which enables cursor-based + * pagination, PPL's fetch_size simply limits the response to N rows without cursor support. The + * effective upper bound is governed by the {@code plugins.query.size_limit} cluster setting + * (defaults to {@code index.max_result_window}, which is 10000). + * + * @return fetch_size value from request, or 0 if not specified (meaning use system default) + */ + public int getFetchSize() { + if (jsonContent == null) { + return 0; + } + return jsonContent.optInt(FETCH_SIZE_FIELD, 0); + } } diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java index 9d8338ecea7..cee084bc71b 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstStatementBuilder.java @@ -15,6 +15,7 @@ import org.opensearch.sql.ast.statement.Explain; import org.opensearch.sql.ast.statement.Query; import org.opensearch.sql.ast.statement.Statement; +import org.opensearch.sql.ast.tree.Head; import org.opensearch.sql.ast.tree.Project; import org.opensearch.sql.ast.tree.UnresolvedPlan; import org.opensearch.sql.ppl.antlr.parser.OpenSearchPPLParser; @@ -30,7 +31,12 @@ public class AstStatementBuilder extends OpenSearchPPLParserBaseVisitor 0) { + rawPlan = new Head(context.getFetchSize(), 0).attach(rawPlan); + } + UnresolvedPlan plan = addSelectAll(rawPlan); + Query query = new Query(plan, 0, PPL); if (ctx.explainStatement() != null) { if (ctx.explainStatement().explainMode() == null) { return new Explain(query, PPL); @@ -51,7 +57,14 @@ protected Statement aggregateResult(Statement aggregate, Statement nextResult) { @Builder public static class StatementBuilderContext { private final boolean isExplain; + + /** + * Maximum number of results to return. 0 means use system default. Unlike SQL's fetch_size + * which enables cursor-based pagination, PPL's fetch_size limits the response to N rows without + * cursor support. + */ private final int fetchSize; + private final String format; private final String explainMode; } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java index 5db142a6506..4faadd4850c 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/PPLServiceTest.java @@ -102,7 +102,7 @@ public void onFailure(Exception e) { public void testExecuteShouldPass() { doAnswer( invocation -> { - ResponseListener listener = invocation.getArgument(1); + ResponseListener listener = invocation.getArgument(2); listener.onResponse(new QueryResponse(schema, Collections.emptyList(), Cursor.None)); return null; }) @@ -119,7 +119,7 @@ public void testExecuteShouldPass() { public void testExecuteCsvFormatShouldPass() { doAnswer( invocation -> { - ResponseListener listener = invocation.getArgument(1); + ResponseListener listener = invocation.getArgument(2); listener.onResponse(new QueryResponse(schema, Collections.emptyList(), Cursor.None)); return null; }) @@ -173,7 +173,7 @@ public void testExplainWithIllegalQueryShouldBeCaughtByHandler() { public void testPrometheusQuery() { doAnswer( invocation -> { - ResponseListener listener = invocation.getArgument(1); + ResponseListener listener = invocation.getArgument(2); listener.onResponse(new QueryResponse(schema, Collections.emptyList(), Cursor.None)); return null; }) diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java index f4e90395cb5..9fe3a4fef64 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/domain/PPLQueryRequestTest.java @@ -8,6 +8,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import org.json.JSONObject; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -55,4 +56,40 @@ public void testUnsupportedFormat() { exceptionRule.expectMessage("response in " + format + " format is not supported."); request.format(); } + + @Test + public void testGetFetchSizeReturnsValueFromJson() { + JSONObject json = new JSONObject("{\"query\": \"source=t\", \"fetch_size\": 100}"); + PPLQueryRequest request = new PPLQueryRequest("source=t", json, "/_plugins/_ppl"); + assertEquals(100, request.getFetchSize()); + } + + @Test + public void testGetFetchSizeReturnsZeroWhenNotSpecified() { + JSONObject json = new JSONObject("{\"query\": \"source=t\"}"); + PPLQueryRequest request = new PPLQueryRequest("source=t", json, "/_plugins/_ppl"); + assertEquals(0, request.getFetchSize()); + } + + @Test + public void testGetFetchSizeReturnsZeroWhenJsonContentIsNull() { + PPLQueryRequest request = new PPLQueryRequest("source=t", null, "/_plugins/_ppl"); + assertEquals(0, request.getFetchSize()); + } + + @Test + public void testGetFetchSizeHandlesExplicitNull() { + JSONObject json = new JSONObject(); + json.put("query", "source=t"); + json.put("fetch_size", JSONObject.NULL); + PPLQueryRequest request = new PPLQueryRequest("source=t", json, "/_plugins/_ppl"); + assertEquals(0, request.getFetchSize()); + } + + @Test + public void testGetFetchSizeWithLargeValue() { + JSONObject json = new JSONObject("{\"query\": \"source=t\", \"fetch_size\": 15000}"); + PPLQueryRequest request = new PPLQueryRequest("source=t", json, "/_plugins/_ppl"); + assertEquals(15000, request.getFetchSize()); + } } diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstStatementBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstStatementBuilderTest.java index 568a771732f..4229bbc8af3 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstStatementBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstStatementBuilderTest.java @@ -58,6 +58,80 @@ public void buildExplainStatement() { new Explain(new Query(project(search(relation("t"), "a:1"), AllFields.of()), 0, PPL), PPL)); } + @Test + public void buildQueryStatementWithFetchSize() { + // When fetchSize > 0, a Head node is injected below Project (addSelectAll wraps the top) + assertEqualWithFetchSize( + "search source=t a=1", + 100, + new Query(project(head(search(relation("t"), "a:1"), 100, 0), AllFields.of()), 0, PPL)); + } + + @Test + public void buildQueryStatementWithFetchSizeZero() { + // fetchSize=0 means use system default - no Head node injected + assertEqualWithFetchSize( + "search source=t a=1", + 0, + new Query(project(search(relation("t"), "a:1"), AllFields.of()), 0, PPL)); + } + + @Test + public void buildQueryStatementWithLargeFetchSize() { + assertEqualWithFetchSize( + "search source=t a=1", + 10000, + new Query(project(head(search(relation("t"), "a:1"), 10000, 0), AllFields.of()), 0, PPL)); + } + + @Test + public void buildQueryStatementWithFetchSizeAndSmallerHead() { + // User query has head 3, fetchSize=10 + // Head(10) wraps Head(3), then Project(*) wraps on top + // The inner head 3 limits first, so only 3 rows are returned + assertEqualWithFetchSize( + "source=t | head 3", + 10, + new Query(project(head(head(relation("t"), 3, 0), 10, 0), AllFields.of()), 0, PPL)); + } + + @Test + public void buildQueryStatementWithFetchSizeSmallerThanHead() { + // User query has head 100, fetchSize=5 + // Head(5) wraps Head(100), then Project(*) wraps on top + // The outer head 5 limits, so only 5 rows are returned + assertEqualWithFetchSize( + "source=t | head 100", + 5, + new Query(project(head(head(relation("t"), 100, 0), 5, 0), AllFields.of()), 0, PPL)); + } + + @Test + public void buildQueryStatementWithFetchSizeAndHeadWithOffset() { + // User query has head 3 from 1 (with offset), fetchSize=10 + // The inner head offset is preserved, outer Head always has offset 0 + assertEqualWithFetchSize( + "source=t | head 3 from 1", + 10, + new Query(project(head(head(relation("t"), 3, 1), 10, 0), AllFields.of()), 0, PPL)); + } + + private void assertEqualWithFetchSize(String query, int fetchSize, Statement expectedStatement) { + Node actualPlan = planWithFetchSize(query, fetchSize); + assertEquals(expectedStatement, actualPlan); + } + + private Node planWithFetchSize(String query, int fetchSize) { + final AstStatementBuilder builder = + new AstStatementBuilder( + new AstBuilder(query, settings), + AstStatementBuilder.StatementBuilderContext.builder() + .isExplain(false) + .fetchSize(fetchSize) + .build()); + return builder.visit(parser.parse(query)); + } + private void assertEqual(String query, Statement expectedStatement) { Node actualPlan = plan(query, false); assertEquals(expectedStatement, actualPlan);