From 7b6762d6cd5265a9c3e0bff7d4b69c43387cecab Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Mon, 24 Feb 2025 16:20:53 -0800 Subject: [PATCH 1/3] Deprecate scroll API usage Signed-off-by: Tomoyuki Morita --- .../sql/common/setting/Settings.java | 1 - docs/user/admin/settings.rst | 44 ----- .../org/opensearch/sql/ppl/StandaloneIT.java | 1 - .../sql/sql/StandalonePaginationIT.java | 1 - .../sql/legacy/cursor/DefaultCursor.java | 45 ++--- .../legacy/executor/ElasticHitsExecutor.java | 46 ++--- .../executor/cursor/CursorCloseExecutor.java | 30 +--- .../executor/cursor/CursorResultExecutor.java | 41 ++--- .../format/PrettyFormatRestExecutor.java | 42 ++--- .../executor/format/SelectResultSet.java | 25 +-- .../executor/join/ElasticJoinExecutor.java | 20 +-- .../legacy/executor/multi/MinusExecutor.java | 30 ++-- .../sql/legacy/query/DefaultQueryAction.java | 20 +-- .../query/planner/logical/node/TableScan.java | 9 +- .../node/pointInTime/PointInTime.java | 11 +- .../format/PrettyFormatRestExecutorTest.java | 22 --- .../unittest/cursor/DefaultCursorTest.java | 20 --- .../unittest/planner/QueryPlannerTest.java | 158 ++++++++++-------- .../query/DefaultQueryActionTest.java | 73 -------- .../request/OpenSearchRequestBuilder.java | 6 +- .../setting/OpenSearchSettings.java | 14 -- .../storage/scan/OpenSearchIndexScan.java | 11 +- .../OpenSearchExecutionEngineTest.java | 1 - .../OpenSearchExecutionProtectorTest.java | 2 - .../request/OpenSearchRequestBuilderTest.java | 107 ------------ .../storage/OpenSearchIndexTest.java | 3 - .../OpenSearchIndexScanPaginationTest.java | 3 - .../storage/scan/OpenSearchIndexScanTest.java | 46 ----- 28 files changed, 194 insertions(+), 638 deletions(-) diff --git a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java index cbf0e192762..85d2129fcb8 100644 --- a/common/src/main/java/org/opensearch/sql/common/setting/Settings.java +++ b/common/src/main/java/org/opensearch/sql/common/setting/Settings.java @@ -23,7 +23,6 @@ public enum Key { SQL_SLOWLOG("plugins.sql.slowlog"), SQL_CURSOR_KEEP_ALIVE("plugins.sql.cursor.keep_alive"), SQL_DELETE_ENABLED("plugins.sql.delete.enabled"), - SQL_PAGINATION_API_SEARCH_AFTER("plugins.sql.pagination.api"), /** PPL Settings. */ PPL_ENABLED("plugins.ppl.enabled"), diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index b4265deaf73..4bef4f16704 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -160,50 +160,6 @@ Result set:: } } -plugins.sql.pagination.api -================================ - -Description ------------ - -This setting controls whether the SQL search queries in OpenSearch use Point-In-Time (PIT) with search_after or the traditional scroll mechanism for fetching paginated results. - -1. Default Value: true -2. Possible Values: true or false -3. When set to true, the search query in the background uses PIT with search_after instead of scroll to retrieve paginated results. The Cursor Id returned to the user will encode relevant pagination query-related information, which will be used to fetch the subsequent pages of results. -4. This setting is node-level. -5. This setting can be updated dynamically. - - -Example -------- - -You can update the setting with a new value like this. - -SQL query:: - - >> curl -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings -d '{ - "transient" : { - "plugins.sql.pagination.api" : "true" - } - }' - -Result set:: - - { - "acknowledged" : true, - "persistent" : { }, - "transient" : { - "plugins" : { - "sql" : { - "pagination" : { - "api" : "true" - } - } - } - } - } - plugins.query.size_limit =========================== diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java index 74f6c7d11ba..cf51f03bfce 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/StandaloneIT.java @@ -152,7 +152,6 @@ private Settings defaultSettings() { private final Map defaultSettings = new ImmutableMap.Builder() .put(Key.QUERY_SIZE_LIMIT, 200) - .put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true) .put(Key.FIELD_TYPE_TOLERANCE, true) .build(); diff --git a/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java b/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java index f6951f4a2c1..4bff7611faf 100644 --- a/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/sql/StandalonePaginationIT.java @@ -166,7 +166,6 @@ private Settings defaultSettings() { new ImmutableMap.Builder() .put(Key.QUERY_SIZE_LIMIT, 200) .put(Key.SQL_CURSOR_KEEP_ALIVE, TimeValue.timeValueMinutes(1)) - .put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true) .put(Key.FIELD_TYPE_TOLERANCE, true) .build(); diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java b/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java index 2b0de9022c2..2d6e1ddb7a9 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java @@ -6,7 +6,6 @@ package org.opensearch.sql.legacy.cursor; import static org.opensearch.core.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -38,7 +37,6 @@ import org.opensearch.index.query.QueryBuilder; import org.opensearch.search.SearchModule; import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.executor.format.Schema; /** @@ -133,24 +131,21 @@ public String generateCursorId() { json.put(INDEX_PATTERN, indexPattern); json.put(SCHEMA_COLUMNS, getSchemaAsJson()); json.put(FIELD_ALIAS_MAP, fieldAliasMap); - if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - json.put(PIT_ID, pitId); - String sortFieldValue = - AccessController.doPrivileged( - (PrivilegedAction) - () -> { - try { - return objectMapper.writeValueAsString(sortFields); - } catch (JsonProcessingException e) { - throw new RuntimeException( - "Failed to parse sort fields from JSON string.", e); - } - }); - json.put(SORT_FIELDS, sortFieldValue); - setSearchRequestString(json, searchSourceBuilder); - } else { - json.put(SCROLL_ID, scrollId); - } + json.put(PIT_ID, pitId); + String sortFieldValue = + AccessController.doPrivileged( + (PrivilegedAction) + () -> { + try { + return objectMapper.writeValueAsString(sortFields); + } catch (JsonProcessingException e) { + throw new RuntimeException( + "Failed to parse sort fields from JSON string.", e); + } + }); + json.put(SORT_FIELDS, sortFieldValue); + setSearchRequestString(json, searchSourceBuilder); + return String.format("%s:%s", type.getId(), encodeCursor(json)); } @@ -169,9 +164,7 @@ private void setSearchRequestString(JSONObject cursorJson, SearchSourceBuilder s } private boolean isCursorIdNullOrEmpty() { - return LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER) - ? Strings.isNullOrEmpty(pitId) - : Strings.isNullOrEmpty(scrollId); + return Strings.isNullOrEmpty(pitId); } public static DefaultCursor from(String cursorId) { @@ -184,11 +177,7 @@ public static DefaultCursor from(String cursorId) { cursor.setFetchSize(json.getInt(FETCH_SIZE)); cursor.setRowsLeft(json.getLong(ROWS_LEFT)); cursor.setIndexPattern(json.getString(INDEX_PATTERN)); - if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - populateCursorForPit(json, cursor); - } else { - cursor.setScrollId(json.getString(SCROLL_ID)); - } + populateCursorForPit(json, cursor); cursor.setColumns(getColumnsFromSchema(json.getJSONArray(SCHEMA_COLUMNS))); cursor.setFieldAliasMap(fieldAliasMap(json.getJSONObject(FIELD_ALIAS_MAP))); diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/ElasticHitsExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/ElasticHitsExecutor.java index cf5189b3c98..7ae114f3f10 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/ElasticHitsExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/ElasticHitsExecutor.java @@ -7,8 +7,6 @@ import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME; import static org.opensearch.search.sort.SortOrder.ASC; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_ID; import java.io.IOException; @@ -16,12 +14,10 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.search.SearchResponse; -import org.opensearch.common.unit.TimeValue; import org.opensearch.search.SearchHits; import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.search.sort.SortOrder; import org.opensearch.sql.legacy.domain.Select; -import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.pit.PointInTimeHandler; import org.opensearch.transport.client.Client; @@ -67,36 +63,20 @@ public SearchResponse getResponseWithHits( request.setSize(size); SearchResponse responseWithHits; - if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - // Set sort field for search_after - boolean ordered = select.isOrderdSelect(); - if (!ordered) { - request.addSort(DOC_FIELD_NAME, ASC); - request.addSort(METADATA_FIELD_ID, SortOrder.ASC); - } - // Set PIT - request.setPointInTime(new PointInTimeBuilder(pit.getPitId())); - // from and size is alternate method to paginate result. - // If select has from clause, search after is not required. - if (previousResponse != null && select.getFrom().isEmpty()) { - request.searchAfter(previousResponse.getHits().getSortFields()); - } - responseWithHits = request.get(); - } else { - // Set scroll - TimeValue keepAlive = LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE); - if (previousResponse != null) { - responseWithHits = - client - .prepareSearchScroll(previousResponse.getScrollId()) - .setScroll(keepAlive) - .execute() - .actionGet(); - } else { - request.setScroll(keepAlive); - responseWithHits = request.get(); - } + // Set sort field for search_after + boolean ordered = select.isOrderdSelect(); + if (!ordered) { + request.addSort(DOC_FIELD_NAME, ASC); + request.addSort(METADATA_FIELD_ID, SortOrder.ASC); } + // Set PIT + request.setPointInTime(new PointInTimeBuilder(pit.getPitId())); + // from and size is alternate method to paginate result. + // If select has from clause, search after is not required. + if (previousResponse != null && select.getFrom().isEmpty()) { + request.searchAfter(previousResponse.getHits().getSortFields()); + } + responseWithHits = request.get(); return responseWithHits; } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorCloseExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorCloseExecutor.java index f9c2870c0fa..9d83d616c78 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorCloseExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorCloseExecutor.java @@ -6,7 +6,6 @@ package org.opensearch.sql.legacy.executor.cursor; import static org.opensearch.core.rest.RestStatus.OK; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; import java.util.Map; import org.apache.logging.log4j.LogManager; @@ -18,7 +17,6 @@ import org.opensearch.rest.RestChannel; import org.opensearch.sql.legacy.cursor.CursorType; import org.opensearch.sql.legacy.cursor.DefaultCursor; -import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.pit.PointInTimeHandler; @@ -83,26 +81,14 @@ public String execute(Client client, Map params) throws Exceptio } private String handleDefaultCursorCloseRequest(Client client, DefaultCursor cursor) { - if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - String pitId = cursor.getPitId(); - PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId); - try { - pit.delete(); - return SUCCEEDED_TRUE; - } catch (RuntimeException e) { - Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - return SUCCEEDED_FALSE; - } - } else { - String scrollId = cursor.getScrollId(); - ClearScrollResponse clearScrollResponse = - client.prepareClearScroll().addScrollId(scrollId).get(); - if (clearScrollResponse.isSucceeded()) { - return SUCCEEDED_TRUE; - } else { - Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - return SUCCEEDED_FALSE; - } + String pitId = cursor.getPitId(); + PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId); + try { + pit.delete(); + return SUCCEEDED_TRUE; + } catch (RuntimeException e) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + return SUCCEEDED_FALSE; } } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorResultExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorResultExecutor.java index 47b71367910..8adffea526e 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorResultExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorResultExecutor.java @@ -7,7 +7,6 @@ import static org.opensearch.core.rest.RestStatus.OK; import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; import java.util.Arrays; import java.util.Map; @@ -111,19 +110,15 @@ private String handleDefaultCursorRequest(Client client, DefaultCursor cursor) { TimeValue paginationTimeout = clusterState.getSettingValue(SQL_CURSOR_KEEP_ALIVE); SearchResponse scrollResponse = null; - if (clusterState.getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - String pitId = cursor.getPitId(); - SearchSourceBuilder source = cursor.getSearchSourceBuilder(); - source.searchAfter(cursor.getSortFields()); - source.pointInTimeBuilder(new PointInTimeBuilder(pitId)); - SearchRequest searchRequest = new SearchRequest(); - searchRequest.source(source); - scrollResponse = client.search(searchRequest).actionGet(); - } else { - String previousScrollId = cursor.getScrollId(); - scrollResponse = - client.prepareSearchScroll(previousScrollId).setScroll(paginationTimeout).get(); - } + + String pitId = cursor.getPitId(); + SearchSourceBuilder source = cursor.getSearchSourceBuilder(); + source.searchAfter(cursor.getSortFields()); + source.pointInTimeBuilder(new PointInTimeBuilder(pitId)); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(source); + scrollResponse = client.search(searchRequest).actionGet(); + SearchHits searchHits = scrollResponse.getHits(); SearchHit[] searchHitArray = searchHits.getHits(); String newScrollId = scrollResponse.getScrollId(); @@ -173,17 +168,13 @@ private String handleDefaultCursorRequest(Client client, DefaultCursor cursor) { } cursor.setRowsLeft(rowsLeft); - if (clusterState.getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - cursor.setPitId(newPitId); - cursor.setSearchSourceBuilder(cursor.getSearchSourceBuilder()); - cursor.setSortFields( - scrollResponse - .getHits() - .getAt(scrollResponse.getHits().getHits().length - 1) - .getSortValues()); - } else { - cursor.setScrollId(newScrollId); - } + cursor.setPitId(newPitId); + cursor.setSearchSourceBuilder(cursor.getSearchSourceBuilder()); + cursor.setSortFields( + scrollResponse + .getHits() + .getAt(scrollResponse.getHits().getHits().length - 1) + .getSortValues()); Protocol protocol = new Protocol(client, searchHits, format.name().toLowerCase(), cursor); return protocol.cursorFormat(); } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java index 473088d0408..3d5ee39976a 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java @@ -5,8 +5,6 @@ package org.opensearch.sql.legacy.executor.format; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; - import java.util.Map; import java.util.Objects; import org.apache.logging.log4j.LogManager; @@ -21,7 +19,6 @@ import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.sql.legacy.cursor.Cursor; import org.opensearch.sql.legacy.cursor.DefaultCursor; -import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.executor.QueryActionElasticExecutor; import org.opensearch.sql.legacy.executor.RestExecutor; @@ -102,29 +99,24 @@ private Protocol buildProtocolForDefaultQuery(Client client, DefaultQueryAction PointInTimeHandler pit = null; SearchResponse response; SqlOpenSearchRequestBuilder sqlOpenSearchRequestBuilder = queryAction.explain(); - if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - pit = new PointInTimeHandlerImpl(client, queryAction.getSelect().getIndexArr()); - pit.create(); - SearchRequestBuilder searchRequest = queryAction.getRequestBuilder(); - searchRequest.setPointInTime(new PointInTimeBuilder(pit.getPitId())); - response = searchRequest.get(); - } else { - response = (SearchResponse) sqlOpenSearchRequestBuilder.get(); - } + + pit = new PointInTimeHandlerImpl(client, queryAction.getSelect().getIndexArr()); + pit.create(); + SearchRequestBuilder searchRequest = queryAction.getRequestBuilder(); + searchRequest.setPointInTime(new PointInTimeBuilder(pit.getPitId())); + response = searchRequest.get(); Protocol protocol; if (isDefaultCursor(response, queryAction)) { DefaultCursor defaultCursor = new DefaultCursor(); defaultCursor.setLimit(queryAction.getSelect().getRowCount()); defaultCursor.setFetchSize(queryAction.getSqlRequest().fetchSize()); - if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - defaultCursor.setPitId(pit.getPitId()); - defaultCursor.setSearchSourceBuilder(queryAction.getRequestBuilder().request().source()); - defaultCursor.setSortFields( - response.getHits().getAt(response.getHits().getHits().length - 1).getSortValues()); - } else { - defaultCursor.setScrollId(response.getScrollId()); - } + + defaultCursor.setPitId(pit.getPitId()); + defaultCursor.setSearchSourceBuilder(queryAction.getRequestBuilder().request().source()); + defaultCursor.setSortFields( + response.getHits().getAt(response.getHits().getHits().length - 1).getSortValues()); + protocol = new Protocol(client, queryAction, response.getHits(), format, defaultCursor); } else { protocol = new Protocol(client, queryAction, response.getHits(), format, Cursor.NULL_CURSOR); @@ -134,12 +126,8 @@ private Protocol buildProtocolForDefaultQuery(Client client, DefaultQueryAction } protected boolean isDefaultCursor(SearchResponse searchResponse, DefaultQueryAction queryAction) { - if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - return queryAction.getSqlRequest().fetchSize() != 0 - && Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value() - >= queryAction.getSqlRequest().fetchSize(); - } else { - return !Strings.isNullOrEmpty(searchResponse.getScrollId()); - } + return queryAction.getSqlRequest().fetchSize() != 0 + && Objects.requireNonNull(searchResponse.getHits().getTotalHits()).value() + >= queryAction.getSqlRequest().fetchSize(); } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java index 9c272005d69..cf2dbc6c997 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java @@ -574,24 +574,13 @@ private void populateDefaultCursor(DefaultCursor cursor) { long rowsLeft = rowsLeft(cursor.getFetchSize(), cursor.getLimit()); if (rowsLeft <= 0) { // Delete Point In Time ID - if (LocalClusterState.state().getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) { - String pitId = cursor.getPitId(); - PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId); - try { - pit.delete(); - } catch (RuntimeException e) { - Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - LOG.info("Error deleting point in time {} ", pitId); - } - } else { - // close the cursor - String scrollId = cursor.getScrollId(); - ClearScrollResponse clearScrollResponse = - client.prepareClearScroll().addScrollId(scrollId).get(); - if (!clearScrollResponse.isSucceeded()) { - Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - LOG.error("Error closing the cursor context {} ", scrollId); - } + String pitId = cursor.getPitId(); + PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId); + try { + pit.delete(); + } catch (RuntimeException e) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + LOG.info("Error deleting point in time {} ", pitId); } return; } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java index 286dd0b400c..145d9081956 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java @@ -5,8 +5,6 @@ package org.opensearch.sql.legacy.executor.join; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; - import java.io.IOException; import java.util.Collection; import java.util.HashMap; @@ -90,10 +88,8 @@ public void sendResponse(RestChannel channel) throws IOException { public void run() throws IOException, SqlParseException { try { long timeBefore = System.currentTimeMillis(); - if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - pit = new PointInTimeHandlerImpl(client, indices); - pit.create(); - } + pit = new PointInTimeHandlerImpl(client, indices); + pit.create(); results = innerRun(); long joinTimeInMilli = System.currentTimeMillis() - timeBefore; this.metaResults.setTookImMilli(joinTimeInMilli); @@ -101,13 +97,11 @@ public void run() throws IOException, SqlParseException { LOG.error("Failed during join query run.", e); throw new IllegalStateException("Error occurred during join query run", e); } finally { - if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - try { - pit.delete(); - } catch (RuntimeException e) { - Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - LOG.info("Error deleting point in time {} ", pit); - } + try { + pit.delete(); + } catch (RuntimeException e) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + LOG.info("Error deleting point in time {} ", pit); } } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java index af60accfe3c..931e0d3243c 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java @@ -5,8 +5,6 @@ package org.opensearch.sql.legacy.executor.multi; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; - import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -68,15 +66,13 @@ public MinusExecutor(Client client, MultiQueryRequestBuilder builder) { @Override public void run() throws SqlParseException { try { - if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - pit = - new PointInTimeHandlerImpl( - client, - ArrayUtils.concat( - builder.getOriginalSelect(true).getIndexArr(), - builder.getOriginalSelect(false).getIndexArr())); - pit.create(); - } + pit = + new PointInTimeHandlerImpl( + client, + ArrayUtils.concat( + builder.getOriginalSelect(true).getIndexArr(), + builder.getOriginalSelect(false).getIndexArr())); + pit.create(); if (this.useTermsOptimization && this.fieldsOrderFirstTable.length != 1) { throw new SqlParseException( @@ -121,13 +117,11 @@ public void run() throws SqlParseException { } catch (Exception e) { LOG.error("Failed during multi query run.", e); } finally { - if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - try { - pit.delete(); - } catch (RuntimeException e) { - Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); - LOG.info("Error deleting point in time {} ", pit); - } + try { + pit.delete(); + } catch (RuntimeException e) { + Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment(); + LOG.info("Error deleting point in time {} ", pit); } } } diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/DefaultQueryAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/DefaultQueryAction.java index 5c622198501..f317d81f4c3 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/DefaultQueryAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/query/DefaultQueryAction.java @@ -5,7 +5,6 @@ package org.opensearch.sql.legacy.query; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_ID; import com.alibaba.druid.sql.ast.SQLExpr; @@ -104,19 +103,14 @@ public void checkAndSetScroll() { .increment(); Metrics.getInstance().getNumericalMetric(MetricName.DEFAULT_CURSOR_REQUEST_TOTAL).increment(); request.setSize(fetchSize); - // Set scroll or search after for pagination - if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - // search after requires results to be in specific order - // set sort field for search_after - boolean ordered = select.isOrderdSelect(); - if (!ordered) { - request.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC); - request.addSort(METADATA_FIELD_ID, SortOrder.ASC); - } - // Request also requires PointInTime, but we should create pit while execution. - } else { - request.setScroll(timeValue); + // search after requires results to be in specific order + // set sort field for search_after + boolean ordered = select.isOrderdSelect(); + if (!ordered) { + request.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC); + request.addSort(METADATA_FIELD_ID, SortOrder.ASC); } + // Request also requires PointInTime, but we should create pit while execution. } else { request.setSearchType(SearchType.DFS_QUERY_THEN_FETCH); setLimit(select.getOffset(), rowCount != null ? rowCount : Select.DEFAULT_LIMIT); diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/logical/node/TableScan.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/logical/node/TableScan.java index 59e6f272162..743b6be1ad0 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/logical/node/TableScan.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/logical/node/TableScan.java @@ -5,16 +5,12 @@ package org.opensearch.sql.legacy.query.planner.logical.node; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; - import java.util.Map; -import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.query.join.TableInJoinRequestBuilder; import org.opensearch.sql.legacy.query.planner.core.PlanNode; import org.opensearch.sql.legacy.query.planner.logical.LogicalOperator; import org.opensearch.sql.legacy.query.planner.physical.PhysicalOperator; import org.opensearch.sql.legacy.query.planner.physical.node.pointInTime.PointInTime; -import org.opensearch.sql.legacy.query.planner.physical.node.scroll.Scroll; /** Table scan */ public class TableScan implements LogicalOperator { @@ -37,10 +33,7 @@ public PlanNode[] children() { @Override public PhysicalOperator[] toPhysical(Map> optimalOps) { - if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) { - return new PhysicalOperator[] {new PointInTime(request, pageSize)}; - } - return new PhysicalOperator[] {new Scroll(request, pageSize)}; + return new PhysicalOperator[] {new PointInTime(request, pageSize)}; } @Override diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java index a879a21ee8f..207f7efa178 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/query/planner/physical/node/pointInTime/PointInTime.java @@ -3,6 +3,7 @@ import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_ID; import org.opensearch.common.unit.TimeValue; +import org.opensearch.search.SearchHit; import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.search.sort.FieldSortBuilder; import org.opensearch.search.sort.SortOrder; @@ -54,13 +55,9 @@ protected void loadFirstBatch() { @Override protected void loadNextBatch() { // Add PIT with search after to fetch next batch of data - if (searchResponse.getHits().getHits() != null - && searchResponse.getHits().getHits().length > 0) { - Object[] sortValues = - searchResponse - .getHits() - .getHits()[searchResponse.getHits().getHits().length - 1] - .getSortValues(); + SearchHit[] hits = searchResponse.getHits().getHits(); + if (hits != null && hits.length > 0) { + Object[] sortValues = hits[hits.length - 1].getSortValues(); LOG.info("Loading next batch of response using Point In Time. - " + pitId); searchResponse = diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorTest.java index 1387412d37b..237e13c7a1e 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutorTest.java @@ -4,7 +4,6 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER; import org.apache.lucene.search.TotalHits; import org.junit.Before; @@ -24,7 +23,6 @@ public class PrettyFormatRestExecutorTest { @Mock private SearchResponse searchResponse; - @Mock private SearchHits searchHits; @Mock private SearchHit searchHit; @Mock private DefaultQueryAction queryAction; @Mock private SqlRequest sqlRequest; @@ -34,8 +32,6 @@ public class PrettyFormatRestExecutorTest { public void setUp() { OpenSearchSettings settings = mock(OpenSearchSettings.class); LocalClusterState.state().setPluginSettings(settings); - when(LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) - .thenReturn(true); when(queryAction.getSqlRequest()).thenReturn(sqlRequest); executor = new PrettyFormatRestExecutor("jdbc"); } @@ -68,22 +64,4 @@ public void testIsDefaultCursor_totalHitsGreaterThanOrEqualToFetchSize() { assertTrue(executor.isDefaultCursor(searchResponse, queryAction)); } - - @Test - public void testIsDefaultCursor_PaginationApiDisabled() { - when(LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) - .thenReturn(false); - when(searchResponse.getScrollId()).thenReturn("someScrollId"); - - assertTrue(executor.isDefaultCursor(searchResponse, queryAction)); - } - - @Test - public void testIsDefaultCursor_PaginationApiDisabled_NoScrollId() { - when(LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) - .thenReturn(false); - when(searchResponse.getScrollId()).thenReturn(null); - - assertFalse(executor.isDefaultCursor(searchResponse, queryAction)); - } } diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/cursor/DefaultCursorTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/cursor/DefaultCursorTest.java index deff7132b0d..292f3ef88ca 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/cursor/DefaultCursorTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/cursor/DefaultCursorTest.java @@ -39,7 +39,6 @@ public void setUp() { MockitoAnnotations.openMocks(this); // Required for Pagination queries using PIT instead of Scroll doReturn(Collections.emptyList()).when(settings).getSettings(); - when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(true); LocalClusterState.state().setPluginSettings(settings); // Mock the toXContent method of SearchSourceBuilder @@ -73,25 +72,6 @@ public void cursorShouldStartWithCursorTypeIDForPIT() { assertThat(cursor.generateCursorId(), startsWith(cursor.getType().getId() + ":")); } - @Test - public void cursorShouldStartWithCursorTypeIDForScroll() { - // Disable PIT for pagination and use scroll instead - when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(false); - - DefaultCursor cursor = new DefaultCursor(); - cursor.setRowsLeft(50); - cursor.setScrollId("dbdskbcdjksbcjkdsbcjk+//"); - cursor.setIndexPattern("myIndex"); - cursor.setFetchSize(500); - cursor.setFieldAliasMap(Collections.emptyMap()); - cursor.setColumns(new ArrayList<>()); - - // Set the mocked SearchSourceBuilder to the cursor - cursor.setSearchSourceBuilder(sourceBuilder); - - assertThat(cursor.generateCursorId(), startsWith(cursor.getType().getId() + ":")); - } - @Test public void nullCursorWhenRowLeftIsLessThanEqualZero() { DefaultCursor cursor = new DefaultCursor(); diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java index 0ff8143bab2..3394e5be5e0 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java @@ -7,7 +7,7 @@ import static java.util.Collections.emptyList; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -30,18 +30,24 @@ import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import org.opensearch.action.search.ClearScrollRequestBuilder; -import org.opensearch.action.search.ClearScrollResponse; +import org.opensearch.action.search.CreatePitAction; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitAction; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.SearchAction; +import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; import org.opensearch.action.search.SearchScrollRequestBuilder; import org.opensearch.cluster.ClusterName; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.bytes.BytesArray; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.search.DocValueFormat; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; -import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.legacy.domain.JoinSelect; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; @@ -65,10 +71,10 @@ public abstract class QueryPlannerTest { @Mock protected Client client; @Mock private SearchResponse response1; - private static final String SCROLL_ID1 = "1"; + private static final String PIT_ID1 = "1"; @Mock private SearchResponse response2; - private static final String SCROLL_ID2 = "2"; + private static final String PIT_ID2 = "2"; @Mock private ClusterSettings clusterSettings; @@ -96,7 +102,7 @@ public static void initLogger() { */ @Before - public void init() { + public void init() throws Exception { MockitoAnnotations.initMocks(this); when(clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING)).thenReturn(ClusterName.DEFAULT); OpenSearchSettings settings = spy(new OpenSearchSettings(clusterSettings)); @@ -105,77 +111,74 @@ public void init() { // to mock. // In this case, default value in Setting will be returned all the time. doReturn(emptyList()).when(settings).getSettings(); - doReturn(false).when(settings).getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER); LocalClusterState.state().setPluginSettings(settings); - ActionFuture mockFuture = mock(ActionFuture.class); - when(client.execute(any(), any())).thenReturn(mockFuture); - - // Differentiate response for Scroll-1/2 by call count and scroll ID. - when(mockFuture.actionGet()) - .thenAnswer( - new Answer() { - private int callCnt; - - @Override - public SearchResponse answer(InvocationOnMock invocation) { - /* - * This works based on assumption that first call comes from Scroll-1, all the following calls come from Scroll-2. - * Because Scroll-1 only open scroll once and must be ahead of Scroll-2 which opens multiple times later. - */ - return callCnt++ == 0 ? response1 : response2; - } - }); - - doReturn(SCROLL_ID1).when(response1).getScrollId(); - doReturn(SCROLL_ID2).when(response2).getScrollId(); - - // Avoid NPE in empty SearchResponse - doReturn(0).when(response1).getFailedShards(); - doReturn(0).when(response2).getFailedShards(); - doReturn(false).when(response1).isTimedOut(); - doReturn(false).when(response2).isTimedOut(); - - returnMockResponse(SCROLL_ID1, response1); - returnMockResponse(SCROLL_ID2, response2); - Metrics.getInstance().registerDefaultMetrics(); } - private void returnMockResponse(String scrollId, SearchResponse response) { - SearchScrollRequestBuilder mockReqBuilder = mock(SearchScrollRequestBuilder.class); - when(client.prepareSearchScroll(scrollId)).thenReturn(mockReqBuilder); - when(mockReqBuilder.setScroll(any(TimeValue.class))).thenReturn(mockReqBuilder); - when(mockReqBuilder.get()).thenReturn(response); - } - - protected SearchHits query(String sql, MockSearchHits mockHits1, MockSearchHits mockHits2) { - doAnswer(mockHits1).when(response1).getHits(); - doAnswer(mockHits2).when(response2).getHits(); + protected SearchHits query(String sql, MockSearchResponse mockResponse1, MockSearchResponse mockResponse2) { + when(client.execute(eq(SearchAction.INSTANCE), any())).thenAnswer(invocation -> { + SearchRequest request = invocation.getArgument(1, SearchRequest.class); + ActionFuture mockFuture = mock(ActionFuture.class); + if (request.source().pointInTimeBuilder().getId().equals(PIT_ID1)) { + when(mockFuture.actionGet()).thenAnswer(mockResponse1); + } else { + when(mockFuture.actionGet()).thenAnswer(mockResponse2); + } + return mockFuture; + }); try (MockedStatic backOffRetryStrategyMocked = Mockito.mockStatic(BackOffRetryStrategy.class)) { backOffRetryStrategyMocked.when(BackOffRetryStrategy::isHealthy).thenReturn(true); - ClearScrollRequestBuilder mockReqBuilder = mock(ClearScrollRequestBuilder.class); - when(client.prepareClearScroll()).thenReturn(mockReqBuilder); - when(mockReqBuilder.addScrollId(any())).thenReturn(mockReqBuilder); - when(mockReqBuilder.get()) - .thenAnswer( - new Answer() { - @Override - public ClearScrollResponse answer(InvocationOnMock invocation) throws Throwable { - mockHits2.reset(); - return new ClearScrollResponse(true, 0); - } - }); + mockCreatePit(PIT_ID1, PIT_ID2); + mockDeletePit(mockResponse1, mockResponse2); List hits = plan(sql).execute(); return new SearchHits( hits.toArray(new SearchHit[0]), new TotalHits(hits.size(), Relation.EQUAL_TO), 0); + } catch (Exception e) { + throw new RuntimeException(e); } } + private void mockCreatePit(String pitId1, String pitId2) throws Exception { + ActionFuture actionFuture1 = mockCreatePitResponse(pitId1); + ActionFuture actionFuture2 = mockCreatePitResponse(pitId2); + when(client.execute(eq(CreatePitAction.INSTANCE), any(CreatePitRequest.class))) + .thenReturn(actionFuture1) + .thenReturn(actionFuture2); + } + + private ActionFuture mockCreatePitResponse(String pitId) throws Exception { + ActionFuture actionFuture = mock(ActionFuture.class); + CreatePitResponse createPitResponse = mock(CreatePitResponse.class); + when(createPitResponse.getId()).thenReturn(pitId); + when(actionFuture.get()).thenReturn(createPitResponse); + return actionFuture; + } + + private void mockDeletePit(MockSearchResponse response1, MockSearchResponse response2) throws Exception { + ActionFuture actionFuture = mock(ActionFuture.class); + DeletePitResponse deletePitResponse = mock(DeletePitResponse.class); + RestStatus restStatus = mock(RestStatus.class); + when(client.execute(eq(DeletePitAction.INSTANCE), any())) + .thenAnswer( + instance -> { + DeletePitRequest deletePitRequest = instance.getArgument(1, DeletePitRequest.class); + if (deletePitRequest.getPitIds().getFirst().equals(PIT_ID1)) { + response1.reset(); + } else if (deletePitRequest.getPitIds().getFirst().equals(PIT_ID2)) { + response2.reset(); + } + return actionFuture; + }); + when(actionFuture.get()).thenReturn(deletePitResponse); + when(deletePitResponse.status()).thenReturn(restStatus); + when(restStatus.getStatus()).thenReturn(200); + } + protected QueryPlanner plan(String sql) { SqlElasticRequestBuilder request = createRequestBuilder(sql); if (request instanceof HashJoinQueryPlanRequestBuilder) { @@ -207,8 +210,8 @@ private SQLExpr toSqlExpr(String sql) { return expr; } - /** Mock SearchHits and slice and return in batch. */ - protected static class MockSearchHits implements Answer { + /** Mock SearchResponse and return each batch in sequence */ + protected static class MockSearchResponse implements Answer { private final SearchHit[] allHits; @@ -216,13 +219,13 @@ protected static class MockSearchHits implements Answer { private int callCnt; - MockSearchHits(SearchHit[] allHits, int batchSize) { + MockSearchResponse(SearchHit[] allHits, int batchSize) { this.allHits = allHits; this.batchSize = batchSize; } @Override - public SearchHits answer(InvocationOnMock invocation) { + public SearchResponse answer(InvocationOnMock invocationOnMock) { SearchHit[] curBatch; if (isNoMoreBatch()) { curBatch = new SearchHit[0]; @@ -230,7 +233,14 @@ public SearchHits answer(InvocationOnMock invocation) { curBatch = currentBatch(); callCnt++; } - return new SearchHits(curBatch, new TotalHits(allHits.length, Relation.EQUAL_TO), 0); + + SearchResponse response = mock(SearchResponse.class); + when(response.getFailedShards()).thenReturn(0); + when(response.isTimedOut()).thenReturn(false); + when(response.getTotalShards()).thenReturn(1); + when(response.getHits()).thenReturn(new SearchHits(curBatch, new TotalHits(allHits.length, Relation.EQUAL_TO), 0)); + + return response; } private boolean isNoMoreBatch() { @@ -254,20 +264,20 @@ private void reset() { } } - protected MockSearchHits employees(SearchHit... mockHits) { + protected MockSearchResponse employees(SearchHit... mockHits) { return employees(5, mockHits); } - protected MockSearchHits employees(int pageSize, SearchHit... mockHits) { - return new MockSearchHits(mockHits, pageSize); + protected MockSearchResponse employees(int pageSize, SearchHit... mockHits) { + return new MockSearchResponse(mockHits, pageSize); } - protected MockSearchHits departments(SearchHit... mockHits) { + protected MockSearchResponse departments(SearchHit... mockHits) { return departments(5, mockHits); } - protected MockSearchHits departments(int pageSize, SearchHit... mockHits) { - return new MockSearchHits(mockHits, pageSize); + protected MockSearchResponse departments(int pageSize, SearchHit... mockHits) { + return new MockSearchResponse(mockHits, pageSize); } protected SearchHit employee(int docId, String lastname, String departmentId) { @@ -281,6 +291,7 @@ protected SearchHit employee(int docId, String lastname, String departmentId) { new BytesArray( "{\"lastname\":\"" + lastname + "\",\"departmentId\":\"" + departmentId + "\"}")); } + hit.sortValues(new Object[] {docId}, new DocValueFormat[] {DocValueFormat.RAW}); return hit; } @@ -293,6 +304,7 @@ protected SearchHit department(int docId, String id, String name) { } else { hit.sourceRef(new BytesArray("{\"id\":\"" + id + "\",\"name\":\"" + name + "\"}")); } + hit.sortValues(new Object[] {docId}, new DocValueFormat[] {DocValueFormat.RAW}); return hit; } } diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/query/DefaultQueryActionTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/query/DefaultQueryActionTest.java index d290e4dd5b0..44b660c7ef3 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/query/DefaultQueryActionTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/query/DefaultQueryActionTest.java @@ -144,11 +144,6 @@ public void testIfScrollShouldBeOpenWithDifferentFormats() { Mockito.verify(mockRequestBuilder).setSize(settingFetchSize); Mockito.verify(mockRequestBuilder).addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC); Mockito.verify(mockRequestBuilder, never()).setScroll(timeValue); - - // Verify setScroll when SQL_PAGINATION_API_SEARCH_AFTER is set to false - mockLocalClusterStateAndIntializeMetricsForScroll(timeValue); - queryAction.checkAndSetScroll(); - Mockito.verify(mockRequestBuilder).setScroll(timeValue); } @Test @@ -169,11 +164,6 @@ public void testIfScrollShouldBeOpen() { Mockito.verify(mockRequestBuilder).setSize(settingFetchSize); Mockito.verify(mockRequestBuilder).addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC); Mockito.verify(mockRequestBuilder, never()).setScroll(timeValue); - - // Verify setScroll when SQL_PAGINATION_API_SEARCH_AFTER is set to false - mockLocalClusterStateAndIntializeMetricsForScroll(timeValue); - queryAction.checkAndSetScroll(); - Mockito.verify(mockRequestBuilder).setScroll(timeValue); } @Test @@ -202,51 +192,6 @@ public void testIfScrollShouldBeOpenWithDifferentFetchSize() { Mockito.verify(mockRequestBuilder).setSize(20); Mockito.verify(mockRequestBuilder).addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC); Mockito.verify(mockRequestBuilder, never()).setScroll(timeValue); - - // Verify setScroll when SQL_PAGINATION_API_SEARCH_AFTER is set to false - mockLocalClusterStateAndIntializeMetricsForScroll(timeValue); - queryAction.checkAndSetScroll(); - Mockito.verify(mockRequestBuilder).setScroll(timeValue); - } - - @Test - public void testIfScrollShouldBeOpenWithDifferentValidFetchSizeAndLimit() { - TimeValue timeValue = new TimeValue(120000); - mockLocalClusterStateAndInitializeMetrics(timeValue); - - int limit = 2300; - doReturn(limit).when(mockSelect).getRowCount(); - SqlRequest mockSqlRequest = mock(SqlRequest.class); - - /** fetchSize <= LIMIT - open scroll */ - int userFetchSize = 1500; - doReturn(userFetchSize).when(mockSqlRequest).fetchSize(); - doReturn(mockRequestBuilder).when(mockRequestBuilder).setSize(userFetchSize); - queryAction.setSqlRequest(mockSqlRequest); - queryAction.setFormat(Format.JDBC); - - queryAction.checkAndSetScroll(); - Mockito.verify(mockRequestBuilder).setSize(userFetchSize); - Mockito.verify(mockRequestBuilder).addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC); - // Skip setScroll when SQL_PAGINATION_API_SEARCH_AFTER is set to false - Mockito.verify(mockRequestBuilder, never()).setScroll(timeValue); - - /** fetchSize > LIMIT - no scroll */ - userFetchSize = 5000; - doReturn(userFetchSize).when(mockSqlRequest).fetchSize(); - mockRequestBuilder = mock(SearchRequestBuilder.class); - queryAction.initialize(mockRequestBuilder); - queryAction.checkAndSetScroll(); - Mockito.verify(mockRequestBuilder).setSize(limit); - Mockito.verify(mockRequestBuilder, never()).setScroll(timeValue); - - // Verify setScroll when SQL_PAGINATION_API_SEARCH_AFTER is set to false - mockLocalClusterStateAndIntializeMetricsForScroll(timeValue); - /** fetchSize <= LIMIT - open scroll */ - userFetchSize = 1500; - doReturn(userFetchSize).when(mockSqlRequest).fetchSize(); - queryAction.checkAndSetScroll(); - Mockito.verify(mockRequestBuilder).setScroll(timeValue); } private void mockLocalClusterStateAndInitializeMetrics(TimeValue time) { @@ -257,24 +202,6 @@ private void mockLocalClusterStateAndInitializeMetrics(TimeValue time) { .when(mockLocalClusterState) .getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW); doReturn(2L).when(mockLocalClusterState).getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL); - doReturn(true) - .when(mockLocalClusterState) - .getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER); - - Metrics.getInstance().registerDefaultMetrics(); - } - - private void mockLocalClusterStateAndIntializeMetricsForScroll(TimeValue time) { - LocalClusterState mockLocalClusterState = mock(LocalClusterState.class); - LocalClusterState.state(mockLocalClusterState); - doReturn(time).when(mockLocalClusterState).getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE); - doReturn(3600L) - .when(mockLocalClusterState) - .getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW); - doReturn(2L).when(mockLocalClusterState).getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL); - doReturn(false) - .when(mockLocalClusterState) - .getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER); Metrics.getInstance().registerDefaultMetrics(); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java index e4026f70ae1..b919c0845bc 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java @@ -95,11 +95,7 @@ public OpenSearchRequest build( int maxResultWindow, TimeValue cursorKeepAlive, OpenSearchClient client) { - if (this.settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) { - return buildRequestWithPit(indexName, maxResultWindow, cursorKeepAlive, client); - } else { - return buildRequestWithScroll(indexName, maxResultWindow, cursorKeepAlive); - } + return buildRequestWithPit(indexName, maxResultWindow, cursorKeepAlive, client); } private OpenSearchRequest buildRequestWithPit( diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java index 0432f5197b2..fbe0198f7d0 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/setting/OpenSearchSettings.java @@ -69,13 +69,6 @@ public class OpenSearchSettings extends Settings { Setting.Property.NodeScope, Setting.Property.Dynamic); - public static final Setting SQL_PAGINATION_API_SEARCH_AFTER_SETTING = - Setting.boolSetting( - Key.SQL_PAGINATION_API_SEARCH_AFTER.getKeyValue(), - true, - Setting.Property.NodeScope, - Setting.Property.Dynamic); - public static final Setting PPL_ENABLED_SETTING = Setting.boolSetting( Key.PPL_ENABLED.getKeyValue(), @@ -266,12 +259,6 @@ public OpenSearchSettings(ClusterSettings clusterSettings) { Key.SQL_DELETE_ENABLED, SQL_DELETE_ENABLED_SETTING, new Updater(Key.SQL_DELETE_ENABLED)); - register( - settingBuilder, - clusterSettings, - Key.SQL_PAGINATION_API_SEARCH_AFTER, - SQL_PAGINATION_API_SEARCH_AFTER_SETTING, - new Updater(Key.SQL_PAGINATION_API_SEARCH_AFTER)); register( settingBuilder, clusterSettings, @@ -456,7 +443,6 @@ public static List> pluginSettings() { .add(SQL_SLOWLOG_SETTING) .add(SQL_CURSOR_KEEP_ALIVE_SETTING) .add(SQL_DELETE_ENABLED_SETTING) - .add(SQL_PAGINATION_API_SEARCH_AFTER_SETTING) .add(PPL_ENABLED_SETTING) .add(DEFAULT_PATTERN_METHOD_SETTING) .add(QUERY_MEMORY_LIMIT_SETTING) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java index 74cbd1f1672..cbcd16762e7 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScan.java @@ -14,14 +14,12 @@ import lombok.ToString; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.io.stream.BytesStreamInput; -import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.exception.NoCursorException; import org.opensearch.sql.executor.pagination.PlanSerializer; import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.request.OpenSearchQueryRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequest; -import org.opensearch.sql.opensearch.request.OpenSearchScrollRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine; import org.opensearch.sql.planner.SerializablePlan; @@ -124,15 +122,8 @@ public void readExternal(ObjectInput in) throws IOException { ((PlanSerializer.CursorDeserializationStream) in).resolveObject("engine"); client = engine.getClient(); - boolean pointInTimeEnabled = - Boolean.parseBoolean( - client.meta().get(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER.getKeyValue())); try (BytesStreamInput bsi = new BytesStreamInput(requestStream)) { - if (pointInTimeEnabled) { - request = new OpenSearchQueryRequest(bsi, engine); - } else { - request = new OpenSearchScrollRequest(bsi, engine); - } + request = new OpenSearchQueryRequest(bsi, engine); } maxResponseSize = in.readInt(); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java index e5cf94eb86b..d66877ed1d6 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngineTest.java @@ -174,7 +174,6 @@ void explain_successfully() { new OpenSearchExecutionEngine(client, protector, new PlanSerializer(null)); Settings settings = mock(Settings.class); when(settings.getSettingValue(SQL_CURSOR_KEEP_ALIVE)).thenReturn(TimeValue.timeValueMinutes(1)); - when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(true); OpenSearchExprValueFactory exprValueFactory = mock(OpenSearchExprValueFactory.class); final var name = new OpenSearchRequest.IndexName("test"); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java index 16f71a7b81b..d9f2ba88a4e 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/executor/protector/OpenSearchExecutionProtectorTest.java @@ -97,8 +97,6 @@ public void setup() { @Test void test_protect_indexScan() { - when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(true); - String indexName = "test"; final int maxResultWindow = 10000; final int querySizeLimit = 200; diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java index 13c06681dab..7c42b9450fe 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilderTest.java @@ -85,9 +85,6 @@ class OpenSearchRequestBuilderTest { @BeforeEach void setup() { requestBuilder = new OpenSearchRequestBuilder(DEFAULT_LIMIT, exprValueFactory, settings); - lenient() - .when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) - .thenReturn(true); lenient().when(settings.getSettingValue(Settings.Key.FIELD_TYPE_TOLERANCE)).thenReturn(false); } @@ -124,7 +121,6 @@ void build_query_request_push_down_size() { @Test void build_PIT_request_with_correct_size() { - when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(true); when(client.createPit(any(CreatePitRequest.class))).thenReturn("samplePITId"); Integer limit = 0; Integer offset = 0; @@ -144,7 +140,6 @@ void build_PIT_request_with_correct_size() { @Test void buildRequestWithPit_pageSizeNull_sizeGreaterThanMaxResultWindow() { - when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(true); when(client.createPit(any(CreatePitRequest.class))).thenReturn("samplePITId"); Integer limit = 600; Integer offset = 0; @@ -168,7 +163,6 @@ void buildRequestWithPit_pageSizeNull_sizeGreaterThanMaxResultWindow() { @Test void buildRequestWithPit_pageSizeNull_sizeLessThanMaxResultWindow() { - when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(true); Integer limit = 400; Integer offset = 0; int requestedTotalSize = 400; @@ -221,107 +215,6 @@ void buildRequestWithPit_pageSizeNotNull_startFromNonZero() { }); } - @Test - void build_scroll_request_with_correct_size() { - when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(false); - Integer limit = 800; - Integer offset = 10; - requestBuilder.pushDownLimit(limit, offset); - requestBuilder.getSourceBuilder().fetchSource("a", "b"); - - assertEquals( - new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), - TimeValue.timeValueMinutes(1), - new SearchSourceBuilder() - .from(offset) - .size(MAX_RESULT_WINDOW - offset) - .timeout(DEFAULT_QUERY_TIMEOUT), - exprValueFactory, - List.of()), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); - } - - @Test - void buildRequestWithScroll_pageSizeNull_sizeGreaterThanMaxResultWindow() { - when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(false); - Integer limit = 600; - Integer offset = 0; - int requestedTotalSize = 600; - requestBuilder = new OpenSearchRequestBuilder(requestedTotalSize, exprValueFactory, settings); - requestBuilder.pushDownLimit(limit, offset); - - assertEquals( - new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), - TimeValue.timeValueMinutes(1), - new SearchSourceBuilder() - .from(offset) - .size(MAX_RESULT_WINDOW - offset) - .timeout(DEFAULT_QUERY_TIMEOUT), - exprValueFactory, - List.of()), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); - } - - @Test - void buildRequestWithScroll_pageSizeNull_sizeLessThanMaxResultWindow() { - when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(false); - Integer limit = 400; - Integer offset = 0; - int requestedTotalSize = 400; - requestBuilder = new OpenSearchRequestBuilder(requestedTotalSize, exprValueFactory, settings); - requestBuilder.pushDownLimit(limit, offset); - - assertEquals( - new OpenSearchQueryRequest( - new OpenSearchRequest.IndexName("test"), - new SearchSourceBuilder() - .from(offset) - .size(requestedTotalSize) - .timeout(DEFAULT_QUERY_TIMEOUT), - exprValueFactory, - List.of()), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); - } - - @Test - void buildRequestWithScroll_pageSizeNotNull_startFromZero() { - when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(false); - int pageSize = 200; - int offset = 0; - int limit = 400; - requestBuilder.pushDownPageSize(pageSize); - requestBuilder.pushDownLimit(limit, offset); - - assertEquals( - new OpenSearchScrollRequest( - new OpenSearchRequest.IndexName("test"), - TimeValue.timeValueMinutes(1), - new SearchSourceBuilder() - .from(offset) - .size(MAX_RESULT_WINDOW - offset) - .timeout(DEFAULT_QUERY_TIMEOUT), - exprValueFactory, - List.of()), - requestBuilder.build(indexName, MAX_RESULT_WINDOW, DEFAULT_QUERY_TIMEOUT, client)); - } - - @Test - void buildRequestWithScroll_pageSizeNotNull_startFromNonZero() { - when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)).thenReturn(false); - int pageSize = 200; - int offset = 100; - int limit = 400; - requestBuilder.pushDownPageSize(pageSize); - requestBuilder.pushDownLimit(limit, offset); - assertThrows( - UnsupportedOperationException.class, - () -> { - requestBuilder.build(indexName, 500, TimeValue.timeValueMinutes(1), client); - }); - } - @Test void test_push_down_query() { QueryBuilder query = QueryBuilders.termQuery("intA", 1); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java index 3f8a07f495d..6af5aaed94e 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java @@ -80,9 +80,6 @@ class OpenSearchIndexTest { @BeforeEach void setUp() { this.index = new OpenSearchIndex(client, settings, "test"); - lenient() - .when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) - .thenReturn(true); lenient().when(settings.getSettingValue(Settings.Key.FIELD_TYPE_TOLERANCE)).thenReturn(true); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java index 6f923cf5c4d..30821fd90c1 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanPaginationTest.java @@ -56,9 +56,6 @@ void setup() { lenient() .when(settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE)) .thenReturn(TimeValue.timeValueMinutes(1)); - lenient() - .when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) - .thenReturn(true); lenient().when(settings.getSettingValue(Settings.Key.FIELD_TYPE_TOLERANCE)).thenReturn(true); } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java index 5381c4a7a7b..0ead49c49fe 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java @@ -82,9 +82,6 @@ class OpenSearchIndexScanTest { @BeforeEach void setup() { - lenient() - .when(settings.getSettingValue(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER)) - .thenReturn(true); lenient().when(settings.getSettingValue(Settings.Key.FIELD_TYPE_TOLERANCE)).thenReturn(true); } @@ -109,43 +106,6 @@ void throws_no_cursor_exception() { } } - @SneakyThrows - @ParameterizedTest - @ValueSource(ints = {0, 150}) - void serialize(Integer numberOfIncludes) { - var searchSourceBuilder = new SearchSourceBuilder().size(4); - - var factory = mock(OpenSearchExprValueFactory.class); - var engine = mock(OpenSearchStorageEngine.class); - var index = mock(OpenSearchIndex.class); - when(engine.getClient()).thenReturn(client); - when(engine.getTable(any(), any())).thenReturn(index); - var includes = - Stream.iterate(1, i -> i + 1) - .limit(numberOfIncludes) - .map(i -> "column" + i) - .collect(Collectors.toList()); - var request = - new OpenSearchScrollRequest( - INDEX_NAME, CURSOR_KEEP_ALIVE, searchSourceBuilder, factory, includes); - request.setScrollId("valid-id"); - // make a response, so OpenSearchResponse::isEmpty would return true and unset needClean - var response = mock(SearchResponse.class); - when(response.getAggregations()).thenReturn(mock()); - var hits = mock(SearchHits.class); - when(response.getHits()).thenReturn(hits); - when(response.getScrollId()).thenReturn("valid-id"); - when(hits.getHits()).thenReturn(new SearchHit[] {mock()}); - request.search(null, (req) -> response); - - try (var indexScan = new OpenSearchIndexScan(client, QUERY_SIZE, request)) { - var planSerializer = new PlanSerializer(engine); - var cursor = planSerializer.convertToCursor(indexScan); - var newPlan = planSerializer.convertToPlan(cursor.toString()); - assertEquals(indexScan, newPlan); - } - } - @SneakyThrows @ParameterizedTest @ValueSource(ints = {0, 150}) @@ -157,9 +117,6 @@ void serialize_PIT(Integer numberOfIncludes) { var index = mock(OpenSearchIndex.class); when(engine.getClient()).thenReturn(client); when(engine.getTable(any(), any())).thenReturn(index); - Map map = mock(Map.class); - when(map.get(any(String.class))).thenReturn("true"); - when(client.meta()).thenReturn(map); var includes = Stream.iterate(1, i -> i + 1) .limit(numberOfIncludes) @@ -183,9 +140,6 @@ void serialize_PIT(Integer numberOfIncludes) { var cursor = planSerializer.convertToCursor(indexScan); var newPlan = planSerializer.convertToPlan(cursor.toString()); assertNotNull(newPlan); - - verify(client).meta(); - verify(map).get(Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER.getKeyValue()); } } From 9ea3f54c5de44930b5bf7ed58d6b7f26075176b7 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Mon, 24 Feb 2025 17:13:19 -0800 Subject: [PATCH 2/3] reformat Signed-off-by: Tomoyuki Morita --- .../sql/legacy/cursor/DefaultCursor.java | 3 +- .../executor/cursor/CursorCloseExecutor.java | 1 - .../format/PrettyFormatRestExecutor.java | 1 - .../executor/format/SelectResultSet.java | 3 -- .../executor/join/ElasticJoinExecutor.java | 1 - .../legacy/executor/multi/MinusExecutor.java | 1 - .../unittest/cursor/DefaultCursorTest.java | 1 - .../unittest/planner/QueryPlannerTest.java | 33 +++++++++++-------- .../storage/scan/OpenSearchIndexScanTest.java | 1 - 9 files changed, 20 insertions(+), 25 deletions(-) diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java b/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java index 2d6e1ddb7a9..e7955289561 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/cursor/DefaultCursor.java @@ -139,8 +139,7 @@ public String generateCursorId() { try { return objectMapper.writeValueAsString(sortFields); } catch (JsonProcessingException e) { - throw new RuntimeException( - "Failed to parse sort fields from JSON string.", e); + throw new RuntimeException("Failed to parse sort fields from JSON string.", e); } }); json.put(SORT_FIELDS, sortFieldValue); diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorCloseExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorCloseExecutor.java index 9d83d616c78..1c26d10ab01 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorCloseExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorCloseExecutor.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger; import org.json.JSONException; import org.opensearch.OpenSearchException; -import org.opensearch.action.search.ClearScrollResponse; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestChannel; import org.opensearch.sql.legacy.cursor.CursorType; diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java index 3d5ee39976a..9f613f68c33 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/PrettyFormatRestExecutor.java @@ -12,7 +12,6 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.search.SearchRequestBuilder; import org.opensearch.action.search.SearchResponse; -import org.opensearch.core.common.Strings; import org.opensearch.core.rest.RestStatus; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestChannel; diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java index cf2dbc6c997..802f88b7ecb 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/format/SelectResultSet.java @@ -30,7 +30,6 @@ import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse; import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsRequest; import org.opensearch.action.admin.indices.mapping.get.GetFieldMappingsResponse; -import org.opensearch.action.search.ClearScrollResponse; import org.opensearch.common.document.DocumentField; import org.opensearch.core.common.Strings; import org.opensearch.search.SearchHit; @@ -41,7 +40,6 @@ import org.opensearch.search.aggregations.metrics.NumericMetricsAggregation; import org.opensearch.search.aggregations.metrics.Percentile; import org.opensearch.search.aggregations.metrics.Percentiles; -import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.legacy.cursor.Cursor; import org.opensearch.sql.legacy.cursor.DefaultCursor; import org.opensearch.sql.legacy.domain.ColumnTypeProvider; @@ -51,7 +49,6 @@ import org.opensearch.sql.legacy.domain.Query; import org.opensearch.sql.legacy.domain.Select; import org.opensearch.sql.legacy.domain.TableOnJoinSelect; -import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.esdomain.mapping.FieldMapping; import org.opensearch.sql.legacy.exception.SqlFeatureNotImplementedException; import org.opensearch.sql.legacy.executor.Format; diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java index 145d9081956..bddb2c75485 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/join/ElasticJoinExecutor.java @@ -25,7 +25,6 @@ import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; import org.opensearch.sql.legacy.domain.Field; -import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.executor.ElasticHitsExecutor; import org.opensearch.sql.legacy.metrics.MetricName; diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java index 931e0d3243c..153d43ca976 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/multi/MinusExecutor.java @@ -27,7 +27,6 @@ import org.opensearch.sql.legacy.domain.Where; import org.opensearch.sql.legacy.domain.hints.Hint; import org.opensearch.sql.legacy.domain.hints.HintType; -import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.exception.SqlParseException; import org.opensearch.sql.legacy.executor.ElasticHitsExecutor; import org.opensearch.sql.legacy.metrics.MetricName; diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/cursor/DefaultCursorTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/cursor/DefaultCursorTest.java index 292f3ef88ca..5a5840b4469 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/cursor/DefaultCursorTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/cursor/DefaultCursorTest.java @@ -23,7 +23,6 @@ import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.search.builder.SearchSourceBuilder; -import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.legacy.cursor.CursorType; import org.opensearch.sql.legacy.cursor.DefaultCursor; import org.opensearch.sql.legacy.esdomain.LocalClusterState; diff --git a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java index 3394e5be5e0..6e9b1d67d7d 100644 --- a/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java +++ b/legacy/src/test/java/org/opensearch/sql/legacy/unittest/planner/QueryPlannerTest.java @@ -39,7 +39,6 @@ import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.search.SearchScrollRequestBuilder; import org.opensearch.cluster.ClusterName; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.ClusterSettings; @@ -116,17 +115,20 @@ public void init() throws Exception { Metrics.getInstance().registerDefaultMetrics(); } - protected SearchHits query(String sql, MockSearchResponse mockResponse1, MockSearchResponse mockResponse2) { - when(client.execute(eq(SearchAction.INSTANCE), any())).thenAnswer(invocation -> { - SearchRequest request = invocation.getArgument(1, SearchRequest.class); - ActionFuture mockFuture = mock(ActionFuture.class); - if (request.source().pointInTimeBuilder().getId().equals(PIT_ID1)) { - when(mockFuture.actionGet()).thenAnswer(mockResponse1); - } else { - when(mockFuture.actionGet()).thenAnswer(mockResponse2); - } - return mockFuture; - }); + protected SearchHits query( + String sql, MockSearchResponse mockResponse1, MockSearchResponse mockResponse2) { + when(client.execute(eq(SearchAction.INSTANCE), any())) + .thenAnswer( + invocation -> { + SearchRequest request = invocation.getArgument(1, SearchRequest.class); + ActionFuture mockFuture = mock(ActionFuture.class); + if (request.source().pointInTimeBuilder().getId().equals(PIT_ID1)) { + when(mockFuture.actionGet()).thenAnswer(mockResponse1); + } else { + when(mockFuture.actionGet()).thenAnswer(mockResponse2); + } + return mockFuture; + }); try (MockedStatic backOffRetryStrategyMocked = Mockito.mockStatic(BackOffRetryStrategy.class)) { @@ -159,7 +161,8 @@ private ActionFuture mockCreatePitResponse(String pitId) thro return actionFuture; } - private void mockDeletePit(MockSearchResponse response1, MockSearchResponse response2) throws Exception { + private void mockDeletePit(MockSearchResponse response1, MockSearchResponse response2) + throws Exception { ActionFuture actionFuture = mock(ActionFuture.class); DeletePitResponse deletePitResponse = mock(DeletePitResponse.class); RestStatus restStatus = mock(RestStatus.class); @@ -238,7 +241,9 @@ public SearchResponse answer(InvocationOnMock invocationOnMock) { when(response.getFailedShards()).thenReturn(0); when(response.isTimedOut()).thenReturn(false); when(response.getTotalShards()).thenReturn(1); - when(response.getHits()).thenReturn(new SearchHits(curBatch, new TotalHits(allHits.length, Relation.EQUAL_TO), 0)); + when(response.getHits()) + .thenReturn( + new SearchHits(curBatch, new TotalHits(allHits.length, Relation.EQUAL_TO), 0)); return response; } diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java index 0ead49c49fe..344682f0ce9 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexScanTest.java @@ -57,7 +57,6 @@ import org.opensearch.sql.opensearch.request.OpenSearchQueryRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequest; import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder; -import org.opensearch.sql.opensearch.request.OpenSearchScrollRequest; import org.opensearch.sql.opensearch.response.OpenSearchResponse; import org.opensearch.sql.opensearch.storage.OpenSearchIndex; import org.opensearch.sql.opensearch.storage.OpenSearchStorageEngine; From ca18a9b5e12a391ce80987074edd58bc651275e7 Mon Sep 17 00:00:00 2001 From: Tomoyuki Morita Date: Mon, 24 Feb 2025 17:43:28 -0800 Subject: [PATCH 3/3] Remove OpenSearchRequestBuilder.buildRequestWithScroll Signed-off-by: Tomoyuki Morita --- .../request/OpenSearchRequestBuilder.java | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java index b919c0845bc..22490803241 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/OpenSearchRequestBuilder.java @@ -132,32 +132,6 @@ private OpenSearchRequest buildRequestWithPit( } } - private OpenSearchRequest buildRequestWithScroll( - OpenSearchRequest.IndexName indexName, int maxResultWindow, TimeValue cursorKeepAlive) { - int size = requestedTotalSize; - FetchSourceContext fetchSource = this.sourceBuilder.fetchSource(); - List includes = fetchSource != null ? Arrays.asList(fetchSource.includes()) : List.of(); - - if (pageSize == null) { - if (startFrom + size > maxResultWindow) { - sourceBuilder.size(maxResultWindow - startFrom); - return new OpenSearchScrollRequest( - indexName, cursorKeepAlive, sourceBuilder, exprValueFactory, includes); - } else { - sourceBuilder.from(startFrom); - sourceBuilder.size(requestedTotalSize); - return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, includes); - } - } else { - if (startFrom != 0) { - throw new UnsupportedOperationException("Non-zero offset is not supported with pagination"); - } - sourceBuilder.size(pageSize); - return new OpenSearchScrollRequest( - indexName, cursorKeepAlive, sourceBuilder, exprValueFactory, includes); - } - } - private String createPit( OpenSearchRequest.IndexName indexName, TimeValue cursorKeepAlive, OpenSearchClient client) { // Create PIT ID for request