Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public enum Key {
SQL_ENABLED("plugins.sql.enabled"),
SQL_SLOWLOG("plugins.sql.slowlog"),
SQL_CURSOR_KEEP_ALIVE("plugins.sql.cursor.keep_alive"),
SQL_PAGINATION_API_SEARCH_AFTER("plugins.sql.pagination.api"),

/** PPL Settings. */
PPL_ENABLED("plugins.ppl.enabled"),
Expand Down
44 changes: 0 additions & 44 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,50 +160,6 @@ Result set::
}
}

plugins.sql.pagination.api
================================

Description
-----------

This setting controls whether the SQL search queries in OpenSearch use Point-In-Time (PIT) with search_after or the traditional scroll mechanism for fetching paginated results.

1. Default Value: true
2. Possible Values: true or false
3. When set to true, the search query in the background uses PIT with search_after instead of scroll to retrieve paginated results. The Cursor Id returned to the user will encode relevant pagination query-related information, which will be used to fetch the subsequent pages of results.
4. This setting is node-level.
5. This setting can be updated dynamically.


Example
-------

You can update the setting with a new value like this.

SQL query::

>> curl -H 'Content-Type: application/json' -X PUT localhost:9200/_plugins/_query/settings -d '{
"transient" : {
"plugins.sql.pagination.api" : "true"
}
}'

Result set::

{
"acknowledged" : true,
"persistent" : { },
"transient" : {
"plugins" : {
"sql" : {
"pagination" : {
"api" : "true"
}
}
}
}
}

plugins.query.size_limit
===========================

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ private Settings defaultSettings() {
private final Map<Key, Object> defaultSettings =
new ImmutableMap.Builder<Key, Object>()
.put(Key.QUERY_SIZE_LIMIT, 200)
.put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true)
.put(Key.FIELD_TYPE_TOLERANCE, true)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ private Settings defaultSettings() {
new ImmutableMap.Builder<Key, Object>()
.put(Key.QUERY_SIZE_LIMIT, 200)
.put(Key.SQL_CURSOR_KEEP_ALIVE, TimeValue.timeValueMinutes(1))
.put(Key.SQL_PAGINATION_API_SEARCH_AFTER, true)
.put(Key.FIELD_TYPE_TOLERANCE, true)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package org.opensearch.sql.legacy.cursor;

import static org.opensearch.core.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -38,7 +37,6 @@
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchModule;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.executor.format.Schema;

/**
Expand Down Expand Up @@ -133,24 +131,20 @@ public String generateCursorId() {
json.put(INDEX_PATTERN, indexPattern);
json.put(SCHEMA_COLUMNS, getSchemaAsJson());
json.put(FIELD_ALIAS_MAP, fieldAliasMap);
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
json.put(PIT_ID, pitId);
String sortFieldValue =
AccessController.doPrivileged(
(PrivilegedAction<String>)
() -> {
try {
return objectMapper.writeValueAsString(sortFields);
} catch (JsonProcessingException e) {
throw new RuntimeException(
"Failed to parse sort fields from JSON string.", e);
}
});
json.put(SORT_FIELDS, sortFieldValue);
setSearchRequestString(json, searchSourceBuilder);
} else {
json.put(SCROLL_ID, scrollId);
}
json.put(PIT_ID, pitId);
String sortFieldValue =
AccessController.doPrivileged(
(PrivilegedAction<String>)
() -> {
try {
return objectMapper.writeValueAsString(sortFields);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to parse sort fields from JSON string.", e);
}
});
json.put(SORT_FIELDS, sortFieldValue);
setSearchRequestString(json, searchSourceBuilder);

return String.format("%s:%s", type.getId(), encodeCursor(json));
}

Expand All @@ -169,9 +163,7 @@ private void setSearchRequestString(JSONObject cursorJson, SearchSourceBuilder s
}

private boolean isCursorIdNullOrEmpty() {
return LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)
? Strings.isNullOrEmpty(pitId)
: Strings.isNullOrEmpty(scrollId);
return Strings.isNullOrEmpty(pitId);
}

public static DefaultCursor from(String cursorId) {
Expand All @@ -184,11 +176,7 @@ public static DefaultCursor from(String cursorId) {
cursor.setFetchSize(json.getInt(FETCH_SIZE));
cursor.setRowsLeft(json.getLong(ROWS_LEFT));
cursor.setIndexPattern(json.getString(INDEX_PATTERN));
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
populateCursorForPit(json, cursor);
} else {
cursor.setScrollId(json.getString(SCROLL_ID));
}
populateCursorForPit(json, cursor);
cursor.setColumns(getColumnsFromSchema(json.getJSONArray(SCHEMA_COLUMNS)));
cursor.setFieldAliasMap(fieldAliasMap(json.getJSONObject(FIELD_ALIAS_MAP)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,17 @@

import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME;
import static org.opensearch.search.sort.SortOrder.ASC;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;
import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_ID;

import java.io.IOException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.sql.legacy.domain.Select;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
import org.opensearch.transport.client.Client;
Expand Down Expand Up @@ -67,36 +63,20 @@ public SearchResponse getResponseWithHits(
request.setSize(size);
SearchResponse responseWithHits;

if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
// Set sort field for search_after
boolean ordered = select.isOrderdSelect();
if (!ordered) {
request.addSort(DOC_FIELD_NAME, ASC);
request.addSort(METADATA_FIELD_ID, SortOrder.ASC);
}
// Set PIT
request.setPointInTime(new PointInTimeBuilder(pit.getPitId()));
// from and size is alternate method to paginate result.
// If select has from clause, search after is not required.
if (previousResponse != null && select.getFrom().isEmpty()) {
request.searchAfter(previousResponse.getHits().getSortFields());
}
responseWithHits = request.get();
} else {
// Set scroll
TimeValue keepAlive = LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE);
if (previousResponse != null) {
responseWithHits =
client
.prepareSearchScroll(previousResponse.getScrollId())
.setScroll(keepAlive)
.execute()
.actionGet();
} else {
request.setScroll(keepAlive);
responseWithHits = request.get();
}
// Set sort field for search_after
boolean ordered = select.isOrderdSelect();
if (!ordered) {
request.addSort(DOC_FIELD_NAME, ASC);
request.addSort(METADATA_FIELD_ID, SortOrder.ASC);
}
// Set PIT
request.setPointInTime(new PointInTimeBuilder(pit.getPitId()));
// from and size is alternate method to paginate result.
// If select has from clause, search after is not required.
if (previousResponse != null && select.getFrom().isEmpty()) {
request.searchAfter(previousResponse.getHits().getSortFields());
}
responseWithHits = request.get();

return responseWithHits;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,16 @@
package org.opensearch.sql.legacy.executor.cursor;

import static org.opensearch.core.rest.RestStatus.OK;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.json.JSONException;
import org.opensearch.OpenSearchException;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.sql.legacy.cursor.CursorType;
import org.opensearch.sql.legacy.cursor.DefaultCursor;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
Expand Down Expand Up @@ -83,26 +80,14 @@ public String execute(Client client, Map<String, String> params) throws Exceptio
}

private String handleDefaultCursorCloseRequest(Client client, DefaultCursor cursor) {
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
String pitId = cursor.getPitId();
PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId);
try {
pit.delete();
return SUCCEEDED_TRUE;
} catch (RuntimeException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
return SUCCEEDED_FALSE;
}
} else {
String scrollId = cursor.getScrollId();
ClearScrollResponse clearScrollResponse =
client.prepareClearScroll().addScrollId(scrollId).get();
if (clearScrollResponse.isSucceeded()) {
return SUCCEEDED_TRUE;
} else {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
return SUCCEEDED_FALSE;
}
String pitId = cursor.getPitId();
PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId);
try {
pit.delete();
return SUCCEEDED_TRUE;
} catch (RuntimeException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
return SUCCEEDED_FALSE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import static org.opensearch.core.rest.RestStatus.OK;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -111,19 +110,15 @@ private String handleDefaultCursorRequest(Client client, DefaultCursor cursor) {
TimeValue paginationTimeout = clusterState.getSettingValue(SQL_CURSOR_KEEP_ALIVE);

SearchResponse scrollResponse = null;
if (clusterState.getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
String pitId = cursor.getPitId();
SearchSourceBuilder source = cursor.getSearchSourceBuilder();
source.searchAfter(cursor.getSortFields());
source.pointInTimeBuilder(new PointInTimeBuilder(pitId));
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(source);
scrollResponse = client.search(searchRequest).actionGet();
} else {
String previousScrollId = cursor.getScrollId();
scrollResponse =
client.prepareSearchScroll(previousScrollId).setScroll(paginationTimeout).get();
}

String pitId = cursor.getPitId();
SearchSourceBuilder source = cursor.getSearchSourceBuilder();
source.searchAfter(cursor.getSortFields());
source.pointInTimeBuilder(new PointInTimeBuilder(pitId));
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(source);
scrollResponse = client.search(searchRequest).actionGet();

SearchHits searchHits = scrollResponse.getHits();
SearchHit[] searchHitArray = searchHits.getHits();
String newScrollId = scrollResponse.getScrollId();
Expand Down Expand Up @@ -173,17 +168,13 @@ private String handleDefaultCursorRequest(Client client, DefaultCursor cursor) {
}

cursor.setRowsLeft(rowsLeft);
if (clusterState.getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
cursor.setPitId(newPitId);
cursor.setSearchSourceBuilder(cursor.getSearchSourceBuilder());
cursor.setSortFields(
scrollResponse
.getHits()
.getAt(scrollResponse.getHits().getHits().length - 1)
.getSortValues());
} else {
cursor.setScrollId(newScrollId);
}
cursor.setPitId(newPitId);
cursor.setSearchSourceBuilder(cursor.getSearchSourceBuilder());
cursor.setSortFields(
scrollResponse
.getHits()
.getAt(scrollResponse.getHits().getHits().length - 1)
.getSortValues());
Protocol protocol = new Protocol(client, searchHits, format.name().toLowerCase(), cursor);
return protocol.cursorFormat();
}
Expand Down
Loading
Loading