Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
setup:
- skip:
features:
- headers
- do:
indices.create:
index: test
body:
settings:
max_result_window: 1
- do:
bulk:
index: test
refresh: true
body:
- '{"index": {}}'
- '{"id": 1}'
- '{"index": {}}'
- '{"id": 2}'
- '{"index": {}}'
- '{"id": 3}'


---
teardown:
- do:
query.settings:
body:
transient:
plugins.calcite.enabled : false
plugins.calcite.fallback.allowed : true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, no need revert setting, by default is disabled

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need this teardown step? it seems not enable calcite in setup step.

@penghuo is each xyx.yml independent?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need revert setting, by default is disabled


---
"Prevent push down limit if the offset exceeds max_result_window":
- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: 'source=test | head 1 from 1 '
- match: {"total": 1}
- match: {"schema": [{"name": "id", "type": "bigint"}]}
- match: {"datarows": [[2]]}

- do:
headers:
Content-Type: 'application/json'
ppl:
body:
query: 'source=test | head 2 | head 1 from 1 '
- match: { "total": 1 }
- match: { "schema": [ { "name": "id", "type": "bigint" } ] }
- match: { "datarows": [ [ 2 ] ] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the result without this fixing?

Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,23 @@ public class OpenSearchRequestBuilder {
@EqualsAndHashCode.Exclude @ToString.Exclude
private final OpenSearchExprValueFactory exprValueFactory;

@EqualsAndHashCode.Exclude @ToString.Exclude private final int maxResultWindow;

private int startFrom = 0;

@ToString.Exclude private final Settings settings;

public static class PushDownUnSupportedException extends RuntimeException {
public PushDownUnSupportedException(String message) {
super(message);
}
}

/** Constructor. */
public OpenSearchRequestBuilder(OpenSearchExprValueFactory exprValueFactory, Settings settings) {
public OpenSearchRequestBuilder(
OpenSearchExprValueFactory exprValueFactory, int maxResultWindow, Settings settings) {
this.settings = settings;
this.maxResultWindow = maxResultWindow;
this.sourceBuilder =
new SearchSourceBuilder()
.from(startFrom)
Expand All @@ -89,16 +99,12 @@ public OpenSearchRequestBuilder(OpenSearchExprValueFactory exprValueFactory, Set
* @return query request with PIT or scroll request
*/
public OpenSearchRequest build(
OpenSearchRequest.IndexName indexName,
int maxResultWindow,
TimeValue cursorKeepAlive,
OpenSearchClient client) {
return build(indexName, maxResultWindow, cursorKeepAlive, client, false);
OpenSearchRequest.IndexName indexName, TimeValue cursorKeepAlive, OpenSearchClient client) {
return build(indexName, cursorKeepAlive, client, false);
}

public OpenSearchRequest build(
OpenSearchRequest.IndexName indexName,
int maxResultWindow,
TimeValue cursorKeepAlive,
OpenSearchClient client,
boolean isMappingEmpty) {
Expand All @@ -109,14 +115,11 @@ public OpenSearchRequest build(
if (sourceBuilder.size() == 0 || isMappingEmpty) {
return new OpenSearchQueryRequest(indexName, sourceBuilder, exprValueFactory, List.of());
}
return buildRequestWithPit(indexName, maxResultWindow, cursorKeepAlive, client);
return buildRequestWithPit(indexName, cursorKeepAlive, client);
}

private OpenSearchRequest buildRequestWithPit(
OpenSearchRequest.IndexName indexName,
int maxResultWindow,
TimeValue cursorKeepAlive,
OpenSearchClient client) {
OpenSearchRequest.IndexName indexName, TimeValue cursorKeepAlive, OpenSearchClient client) {
int size = requestedTotalSize;
FetchSourceContext fetchSource = this.sourceBuilder.fetchSource();
List<String> includes = fetchSource != null ? Arrays.asList(fetchSource.includes()) : List.of();
Expand Down Expand Up @@ -218,10 +221,20 @@ public void pushDownLimit(Integer limit, Integer offset) {
// Besides, there may be cases when the existing requestedTotalSize does not satisfy the
// new limit and offset. E.g. for `head 11 | head 10 from 2`, the new requested total size
// is 9. We need to adjust it accordingly.
requestedTotalSize = Math.min(limit, requestedTotalSize - offset);
int newRequestedTotalSize = Math.min(limit, requestedTotalSize - offset);
// If there are multiple offset, we aggregate the offset
// E.g. for `head 10 from 1 | head 5 from 2` equals to `head 5 from 3`
startFrom += offset;
int newStartFrom = startFrom + offset;

if (newStartFrom >= maxResultWindow) {
throw new PushDownUnSupportedException(
String.format(
"Requested offset %d should be less than the max result window %d",
newStartFrom, maxResultWindow));
}

requestedTotalSize = newRequestedTotalSize;
startFrom = newStartFrom;
sourceBuilder.from(startFrom).size(requestedTotalSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,7 @@ public TableScanBuilder createScanBuilder() {
client,
requestBuilder.getMaxResponseSize(),
requestBuilder.build(
indexName,
getMaxResultWindow(),
cursorKeepAlive,
client,
cachedFieldOpenSearchTypes.isEmpty()));
indexName, cursorKeepAlive, client, cachedFieldOpenSearchTypes.isEmpty()));
return new OpenSearchIndexScanBuilder(builder, createScanOperator);
}

Expand Down Expand Up @@ -258,16 +254,12 @@ public PhysicalPlan visitEval(LogicalEval node, OpenSearchIndexScan context) {
}

public OpenSearchRequestBuilder createRequestBuilder() {
return new OpenSearchRequestBuilder(createExprValueFactory(), settings);
return new OpenSearchRequestBuilder(createExprValueFactory(), getMaxResultWindow(), settings);
}

public OpenSearchRequest buildRequest(OpenSearchRequestBuilder requestBuilder) {
final TimeValue cursorKeepAlive = settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE);
return requestBuilder.build(
indexName,
getMaxResultWindow(),
cursorKeepAlive,
client,
cachedFieldOpenSearchTypes.isEmpty());
indexName, cursorKeepAlive, client, cachedFieldOpenSearchTypes.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.sql.ast.tree.Sort;
import org.opensearch.sql.common.utils.StringUtils;
Expand All @@ -23,6 +25,7 @@
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.function.OpenSearchFunctions;
import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder;
import org.opensearch.sql.opensearch.request.OpenSearchRequestBuilder.PushDownUnSupportedException;
import org.opensearch.sql.opensearch.storage.script.filter.FilterQueryBuilder;
import org.opensearch.sql.opensearch.storage.script.sort.SortQueryBuilder;
import org.opensearch.sql.opensearch.storage.serialization.DefaultExpressionSerializer;
Expand All @@ -41,6 +44,7 @@
@VisibleForTesting
@EqualsAndHashCode
class OpenSearchIndexScanQueryBuilder implements PushDownQueryBuilder {
private static final Logger LOG = LogManager.getLogger(OpenSearchIndexScanQueryBuilder.class);

final OpenSearchRequestBuilder requestBuilder;

Expand Down Expand Up @@ -71,8 +75,18 @@ public boolean pushDownSort(LogicalSort sort) {

@Override
public boolean pushDownLimit(LogicalLimit limit) {
requestBuilder.pushDownLimit(limit.getLimit(), limit.getOffset());
return true;
try {
requestBuilder.pushDownLimit(limit.getLimit(), limit.getOffset());
return true;
} catch (PushDownUnSupportedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Cannot pushdown limit {} with offset {}", limit.getLimit(), limit.getOffset(), e);
} else {
LOG.info("Cannot pushdown limit {} with offset {}", limit.getLimit(), limit.getOffset());
}
return false;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,12 @@ void explain_successfully() {
OpenSearchExprValueFactory exprValueFactory = mock(OpenSearchExprValueFactory.class);
final var name = new OpenSearchRequest.IndexName("test");
final int maxResultWindow = 10000;
final var requestBuilder = new OpenSearchRequestBuilder(exprValueFactory, settings);
final var requestBuilder =
new OpenSearchRequestBuilder(exprValueFactory, maxResultWindow, settings);
PhysicalPlan plan =
new OpenSearchIndexScan(
mock(OpenSearchClient.class),
requestBuilder.build(
name, maxResultWindow, settings.getSettingValue(SQL_CURSOR_KEEP_ALIVE), client));
requestBuilder.build(name, settings.getSettingValue(SQL_CURSOR_KEEP_ALIVE), client));

AtomicReference<ExplainResponse> result = new AtomicReference<>();
executor.explain(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,8 @@ void test_protect_indexScan() {

final var name = new OpenSearchRequest.IndexName(indexName);
final var request =
new OpenSearchRequestBuilder(exprValueFactory, settings)
.build(
name,
maxResultWindow,
settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE),
client);
new OpenSearchRequestBuilder(exprValueFactory, maxResultWindow, settings)
.build(name, settings.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE), client);
assertEquals(
PhysicalPlanDSL.project(
PhysicalPlanDSL.limit(
Expand Down
Loading
Loading