Skip to content
Merged
7 changes: 7 additions & 0 deletions docs/changelog/142700.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
area: ES|QL
issues:
- 140134
- 141083
pr: 142700
summary: Do not push sort on many keyword fields to lucene
type: bug
3 changes: 0 additions & 3 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,6 @@ tests:
- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackSubqueryIT
method: testGiantTextFieldInSubqueryIntermediateResultsWithSort
issue: https://github.com/elastic/elasticsearch/issues/141034
- class: org.elasticsearch.xpack.esql.heap_attack.HeapAttackSubqueryIT
method: testManyRandomKeywordFieldsInSubqueryIntermediateResultsWithSortManyFields
issue: https://github.com/elastic/elasticsearch/issues/141083
- class: org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointServiceNodeTests
method: testGetCheckpointStats
issue: https://github.com/elastic/elasticsearch/issues/141112
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ void initManyBigFieldsIndex(int docs, String type, boolean random) throws IOExce
int docsPerBulk = 5;
int fields = 1000;
int fieldSize = Math.toIntExact(ByteSizeValue.ofKb(1).getBytes());
boolean numeric = type.equalsIgnoreCase("integer") || type.equalsIgnoreCase("long") || type.equalsIgnoreCase("double");

Request request = new Request("PUT", "/manybigfields");
XContentBuilder config = JsonXContent.contentBuilder().startObject();
Expand All @@ -755,10 +756,14 @@ void initManyBigFieldsIndex(int docs, String type, boolean random) throws IOExce
} else {
bulk.append(", ");
}
bulk.append('"').append("f").append(String.format(Locale.ROOT, "%03d", f)).append("\": \"");
// if requested, generate random string to hit the CBE faster
bulk.append(random ? randomAlphaOfLength(1024) : Integer.toString(f % 10).repeat(fieldSize));
bulk.append('"');
bulk.append('"').append("f").append(String.format(Locale.ROOT, "%03d", f)).append("\": ");
if (numeric) {
bulk.append(randomNumericValue(type));
} else {
bulk.append('"');
bulk.append(random ? randomAlphaOfLength(1024) : Integer.toString(f % 10).repeat(fieldSize));
bulk.append('"');
}
}
bulk.append("}\n");
if (d % docsPerBulk == docsPerBulk - 1 && d != docs - 1) {
Expand All @@ -769,6 +774,15 @@ void initManyBigFieldsIndex(int docs, String type, boolean random) throws IOExce
initIndex("manybigfields", bulk.toString());
}

private static String randomNumericValue(String type) {
return switch (type.toLowerCase(Locale.ROOT)) {
case "integer" -> Integer.toString(randomInt());
case "long" -> Long.toString(randomLong());
case "double" -> Double.toString(randomDouble());
default -> throw new IllegalArgumentException("unsupported numeric type: " + type);
};
}

void initGiantTextField(int docs, boolean includeId, long fieldSizeInMb) throws IOException {
int docsPerBulk = isServerless() ? 3 : 10;
logger.info("loading many documents with one big text field - docs per bulk {}", docsPerBulk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import org.apache.lucene.tests.util.TimeUnits;
import org.elasticsearch.Build;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.test.ListMatcher;
import org.junit.Before;

Expand Down Expand Up @@ -86,9 +87,6 @@ public void testManyRandomKeywordFieldsInSubqueryIntermediateResults() throws IO
* This is mainly to test TopNOperator, addInput triggers CBE.
*/
public void testManyRandomKeywordFieldsInSubqueryIntermediateResultsWithSortOneField() throws IOException {
if (isServerless()) { // 500 docs OOM in serverless
return;
}
int docs = 500; // 500MB random/unique keyword values
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true);
for (int subquery : List.of(DEFAULT_SUBQUERIES, MAX_SUBQUERIES)) {
Expand All @@ -102,27 +100,46 @@ public void testManyRandomKeywordFieldsInSubqueryIntermediateResultsWithSortOneF
* This is mainly to test TopNOperator.
*/
public void testManyRandomKeywordFieldsInSubqueryIntermediateResultsWithSortManyFields() throws IOException {
if (isServerless()) { // both 100 and 500 docs OOM in serverless
return;
}
int docs = 500; // // 500MB random/unique keyword values
int docs = 500; // 500MB random/unique keyword values
heapAttackIT.initManyBigFieldsIndex(docs, "keyword", true);
// Some data points:
// 1. Sort on 999 fields, with 500 * 999 random values, without subquery fail/OOM in lucene, LeafFieldComparator
// 2. Sort on 20 fields(500*20 random values), 2 subqueries trigger CBE, 8 subqueries trigger OOM, haven't found a walkaround yet.
StringBuilder sortKeys = new StringBuilder();
sortKeys.append("f000");
for (int f = 1; f < 100; f++) {
sortKeys.append(", f").append(String.format(Locale.ROOT, "%03d", f));
}
// TODO skip 8 subqueries with sort 100 fields, as it OOMs, seems like the constrain is in reading data from lucene,
// LuceneTopNSourceOperator.NonScoringPerShardCollector is the main memory consumer,
// MultiLeafFieldComparator seems big but it is only about 15% of the size of NonScoringPerShardCollector,
for (int subquery : List.of(DEFAULT_SUBQUERIES)) {
for (int subquery : List.of(DEFAULT_SUBQUERIES, MAX_SUBQUERIES)) {
assertCircuitBreaks(attempt -> buildSubqueriesWithSort(subquery, "manybigfields", sortKeys.toString()));
}
}

public void testManyRandomNumericFieldsInSubqueryIntermediateResultsWithSortManyFields() throws IOException {
int docs = 1000;
String type = randomFrom("integer", "long", "double");
heapAttackIT.initManyBigFieldsIndex(docs, type, true);
StringBuilder sortKeys = new StringBuilder();
sortKeys.append("f000");
for (int f = 1; f < 100; f++) {
sortKeys.append(", f").append(String.format(Locale.ROOT, "%03d", f));
}
ListMatcher columns = matchesList();
for (int f = 0; f < 1000; f++) {
columns = columns.item(matchesMap().entry("name", "f" + String.format(Locale.ROOT, "%03d", f)).entry("type", type));
}
for (int subquery : List.of(MAX_SUBQUERIES)) {
// results are returned from non-serverless environment, but CBE is expected in serverless
try {
Map<?, ?> response = buildSubqueriesWithSort(subquery, "manybigfields", sortKeys.toString());
assertMap(response, matchesMap().entry("columns", columns));
} catch (ResponseException e) {
Map<?, ?> map = responseAsMap(e.getResponse());
assertMap(
map,
matchesMap().entry("status", 429).entry("error", matchesMap().extraOk().entry("type", "circuit_breaking_exception"))
);
}
}
}

/*
* The index's size is 1MB * 500, each field has 500 unique/random text values, and these queries don't have aggregation or sort.
* CBE is not triggered here.
Expand Down Expand Up @@ -363,6 +380,6 @@ private Map<String, Object> buildSubqueriesWithSort(int subqueries, String index
query.append(", ").append(subquery);
}
query.append(" \"}");
return responseAsMap(query(query.toString(), "columns,values"));
return responseAsMap(query(query.toString(), "columns"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
import org.elasticsearch.xpack.esql.core.expression.NameId;
import org.elasticsearch.xpack.esql.core.expression.ReferenceAttribute;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.expression.Foldables;
import org.elasticsearch.xpack.esql.expression.Order;
import org.elasticsearch.xpack.esql.expression.function.scalar.spatial.BinarySpatialFunction;
Expand Down Expand Up @@ -69,7 +70,8 @@ protected PhysicalPlan rule(TopNExec topNExec, LocalPhysicalOptimizerContext ctx
ctx.plannerSettings(),
ctx.foldCtx(),
topNExec,
LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags())
LucenePushdownPredicates.from(ctx.searchStats(), ctx.flags()),
resolveMaxKeywordSortFields(ctx)
);
return pushable.rewrite(topNExec);
}
Expand Down Expand Up @@ -135,13 +137,15 @@ private static Pushable evaluatePushable(
PlannerSettings plannerSettings,
FoldContext ctx,
TopNExec topNExec,
LucenePushdownPredicates lucenePushdownPredicates
LucenePushdownPredicates lucenePushdownPredicates,
int maxKeywordSortFields
) {
PhysicalPlan child = topNExec.child();
if (child instanceof EsQueryExec queryExec
&& queryExec.canPushSorts()
&& canPushDownOrders(topNExec.order(), lucenePushdownPredicates)
&& canPushLimit(topNExec, plannerSettings)) {
&& canPushLimit(topNExec, plannerSettings)
&& tooManyKeywordSortFields(topNExec.order(), maxKeywordSortFields) == false) {
// With the simplest case of `FROM index | SORT ...` we only allow pushing down if the sort is on a field
return new PushableQueryExec(queryExec);
}
Expand Down Expand Up @@ -205,7 +209,7 @@ && canPushLimit(topNExec, plannerSettings)) {
break;
}
}
if (pushableSorts.isEmpty() == false) {
if (pushableSorts.isEmpty() == false && tooManyKeywordFieldSorts(pushableSorts, maxKeywordSortFields) == false) {
return new PushableCompoundExec(evalExec, queryExec, pushableSorts);
}
}
Expand Down Expand Up @@ -237,4 +241,48 @@ private static List<EsQueryExec.Sort> buildFieldSorts(List<Order> orders) {
}
return sorts;
}

/**
* Resolves the effective maximum number of keyword sort fields for Lucene pushdown.
* The query-level pragma takes precedence when set to a non-negative value;
* otherwise the cluster-level planner setting is used.
*/
private static int resolveMaxKeywordSortFields(LocalPhysicalOptimizerContext ctx) {
int pragmaValue = ctx.configuration().pragmas().maxKeywordSortFields();
return pragmaValue >= 0 ? pragmaValue : ctx.plannerSettings().maxKeywordSortFields();
}

/**
* Returns {@code true} if the number of keyword {@link FieldAttribute} sort fields in the given orders
* exceeds {@code maxKeywordSortFields}. Used on the simple pushdown path where orders reference
* field attributes directly.
*/
private static boolean tooManyKeywordSortFields(List<Order> orders, int maxKeywordSortFields) {
int count = 0;
for (Order order : orders) {
if (order.child() instanceof FieldAttribute fa && fa.dataType() == DataType.KEYWORD) {
if (++count > maxKeywordSortFields) {
return true;
}
}
}
return false;
}

/**
* Returns {@code true} if the number of keyword {@link EsQueryExec.FieldSort} entries in the given sorts
* exceeds {@code maxKeywordSortFields}. Used on the compound pushdown path.
*/
private static boolean tooManyKeywordFieldSorts(List<EsQueryExec.Sort> sorts, int maxKeywordSortFields) {
int count = 0;
for (EsQueryExec.Sort sort : sorts) {
if (sort instanceof EsQueryExec.FieldSort fs && fs.resulType() == DataType.KEYWORD) {
if (++count > maxKeywordSortFields) {
return true;
}
}
}
return false;
}

}
Loading