Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public String describe() {
* Pick a strategy for the {@link DataPartitioning#AUTO} partitioning.
*/
public static Function<Query, PartitioningStrategy> autoStrategy(int limit) {
return limit == NO_LIMIT ? Factory::highSpeedAutoStrategy : Factory::lowOverheadAutoStrategy;
return limit == NO_LIMIT ? LuceneSourceOperator::highSpeedAutoStrategy : Factory::lowOverheadAutoStrategy;
}

/**
Expand All @@ -133,90 +133,91 @@ private static PartitioningStrategy lowOverheadAutoStrategy(Query query) {
return SHARD;
}

/**
* Select the {@link PartitioningStrategy} based on the {@link Query}.
* <ul>
* <li>
* If the {@linkplain Query} matches <strong>no</strong> documents then this will
* use the {@link PartitioningStrategy#SHARD} strategy so we minimize the overhead
* of finding nothing.
* </li>
* <li>
* If the {@linkplain Query} matches <strong>all</strong> documents then this will
* use the {@link PartitioningStrategy#DOC} strategy because the overhead of using
* that strategy for {@link MatchAllDocsQuery} is very low, and we need as many CPUs
* as we can get to process all the documents.
* </li>
* <li>
* Otherwise use the {@link PartitioningStrategy#SEGMENT} strategy because it's
* overhead is generally low.
* </li>
* </ul>
*/
private static PartitioningStrategy highSpeedAutoStrategy(Query query) {
Query unwrapped = unwrap(query);
log.trace("highSpeedAutoStrategy {} {}", query, unwrapped);
return switch (unwrapped) {
case BooleanQuery q -> highSpeedAutoStrategyForBoolean(q);
case MatchAllDocsQuery q -> DOC;
case MatchNoDocsQuery q -> SHARD;
default -> SEGMENT;
};
}
}

private static Query unwrap(Query query) {
while (true) {
switch (query) {
case BoostQuery q: {
query = q.getQuery();
break;
}
case ConstantScoreQuery q: {
query = q.getQuery();
break;
}
default:
return query;
/**
* Select the {@link PartitioningStrategy} based on the {@link Query}.
* <ul>
* <li>
* If the {@linkplain Query} matches <strong>no</strong> documents then this will
* use the {@link PartitioningStrategy#SHARD} strategy so we minimize the overhead
* of finding nothing.
* </li>
* <li>
* If the {@linkplain Query} matches <strong>all</strong> documents then this will
* use the {@link PartitioningStrategy#DOC} strategy because the overhead of using
* that strategy for {@link MatchAllDocsQuery} is very low, and we need as many CPUs
* as we can get to process all the documents.
* </li>
* <li>
* Otherwise use the {@link PartitioningStrategy#SEGMENT} strategy because it's
* overhead is generally low.
* </li>
* </ul>
*/
public static PartitioningStrategy highSpeedAutoStrategy(Query query) {
Query unwrapped = unwrap(query);
log.trace("highSpeedAutoStrategy {} {}", query, unwrapped);
return switch (unwrapped) {
case BooleanQuery q -> highSpeedAutoStrategyForBoolean(q);
case MatchAllDocsQuery q -> DOC;
case MatchNoDocsQuery q -> SHARD;
default -> SEGMENT;
};
}

private static Query unwrap(Query query) {
while (true) {
switch (query) {
case BoostQuery q: {
query = q.getQuery();
break;
}
case ConstantScoreQuery q: {
query = q.getQuery();
break;
}
default:
return query;
}
}
}

/**
* Select the {@link PartitioningStrategy} for a {@link BooleanQuery}.
* <ul>
* <li>
* If the query can't match anything, returns {@link PartitioningStrategy#SEGMENT}.
* </li>
*
* </ul>
*/
private static PartitioningStrategy highSpeedAutoStrategyForBoolean(BooleanQuery query) {
List<PartitioningStrategy> clauses = new ArrayList<>(query.clauses().size());
boolean allRequired = true;
for (BooleanClause c : query) {
Query clauseQuery = unwrap(c.query());
log.trace("highSpeedAutoStrategyForBooleanClause {} {}", c.occur(), clauseQuery);
if ((c.isProhibited() && clauseQuery instanceof MatchAllDocsQuery)
|| (c.isRequired() && clauseQuery instanceof MatchNoDocsQuery)) {
// Can't match anything
return SHARD;
}
allRequired &= c.isRequired();
clauses.add(highSpeedAutoStrategy(clauseQuery));
}
log.trace("highSpeedAutoStrategyForBooleanClause {} {}", allRequired, clauses);
if (allRequired == false) {
return SEGMENT;
}
if (clauses.stream().anyMatch(s -> s == SHARD)) {
/**
* Select the {@link PartitioningStrategy} for a {@link BooleanQuery}.
* <ul>
* <li>
* If the query can't match anything, returns {@link PartitioningStrategy#SEGMENT}.
* </li>
*
* </ul>
*/
private static PartitioningStrategy highSpeedAutoStrategyForBoolean(BooleanQuery query) {
List<PartitioningStrategy> clauses = new ArrayList<>(query.clauses().size());
boolean allRequired = true;
for (BooleanClause c : query) {
Query clauseQuery = unwrap(c.query());
log.trace("highSpeedAutoStrategyForBooleanClause {} {}", c.occur(), clauseQuery);
if ((c.isProhibited() && clauseQuery instanceof MatchAllDocsQuery)
|| (c.isRequired() && clauseQuery instanceof MatchNoDocsQuery)) {
// Can't match anything
return SHARD;
}
if (clauses.stream().anyMatch(s -> s == SEGMENT)) {
return SEGMENT;
}
assert clauses.stream().allMatch(s -> s == DOC);
return DOC;
allRequired &= c.isRequired();
clauses.add(highSpeedAutoStrategy(clauseQuery));
}
log.trace("highSpeedAutoStrategyForBooleanClause {} {}", allRequired, clauses);
if (allRequired == false) {
return SEGMENT;
}
if (clauses.stream().anyMatch(s -> s == SHARD)) {
return SHARD;
}
if (clauses.stream().anyMatch(s -> s == SEGMENT)) {
return SEGMENT;
}
assert clauses.stream().allMatch(s -> s == DOC);
return DOC;
}

@SuppressWarnings("this-escape")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public Factory(
IndexedByShardId<? extends ShardContext> contexts,
Function<ShardContext, List<LuceneSliceQueue.QueryAndTags>> queryFunction,
DataPartitioning dataPartitioning,
DataPartitioning.AutoStrategy autoStrategy,
int taskConcurrency,
int maxPageSize,
int limit,
Expand All @@ -73,7 +74,9 @@ public Factory(
contexts,
queryFunction,
dataPartitioning,
query -> LuceneSliceQueue.PartitioningStrategy.SHARD,
dataPartitioning == DataPartitioning.AUTO ? autoStrategy.pickStrategy(limit) : query -> {
throw new UnsupportedOperationException("locked in " + dataPartitioning);
},
taskConcurrency,
limit,
needsScore,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sorts) {
new IndexedByShardIdFromSingleton<>(ctx),
queryFunction,
dataPartitioning,
LuceneSourceOperator.Factory::autoStrategy,
taskConcurrency,
maxPageSize,
limit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public Optional<SortAndFormats> buildSort(List<SortBuilder<?>> sorts) {
new IndexedByShardIdFromSingleton<>(ctx),
queryFunction,
dataPartitioning,
LuceneSourceOperator.Factory::autoStrategy,
taskConcurrency,
maxPageSize,
limit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
import org.hamcrest.Matcher;
import org.junit.ClassRule;

Expand Down Expand Up @@ -62,7 +63,7 @@ public static Iterable<Object[]> parameters() {
for (String index : new String[] { "idx", "small_idx" }) {
for (Case c : new Case[] {
new Case("", "SHARD"),
new Case("| SORT @timestamp ASC", "SHARD"),
new Case("| SORT @timestamp ASC", "DOC"),
new Case("| WHERE ABS(a) == 1", "DOC"),
new Case("| WHERE a == 1", "SHARD"),
new Case("| STATS SUM(a)", "DOC"),
Expand All @@ -73,18 +74,18 @@ public static Iterable<Object[]> parameters() {
new Case("| WHERE QSTR(\"a:1\")", "SHARD"),
new Case("| WHERE KQL(\"a:1\")", "SHARD"),
new Case("| WHERE a:\"1\"", "SHARD"),
new Case("| WHERE MATCH(a, \"2\") | SORT _score DESC", "SHARD", true),
new Case("| WHERE QSTR(\"a:2\") | SORT _score DESC", "SHARD", true),
new Case("| WHERE KQL(\"a:2\") | SORT _score DESC", "SHARD", true),
new Case("| WHERE MATCH(a, \"3\") | SORT _score DESC | LIMIT 10", "SHARD", true),
new Case("| WHERE MATCH(a, \"3\") OR MATCH(a, \"4\") | SORT _score DESC | LIMIT 10", "SHARD", true),
new Case("| WHERE a:\"3\" | WHERE a:\"4\" | SORT _score DESC | LIMIT 10", "SHARD", true), }) {
new Case("| WHERE MATCH(a, \"2\") | SORT _score DESC", "SEGMENT", true),
new Case("| WHERE QSTR(\"a:2\") | SORT _score DESC", "SEGMENT", true),
new Case("| WHERE KQL(\"a:2\") | SORT _score DESC", "SEGMENT", true),
new Case("| WHERE MATCH(a, \"3\") | SORT _score DESC | LIMIT 10", "SEGMENT", true),
new Case("| WHERE MATCH(a, \"3\") OR MATCH(a, \"4\") | SORT _score DESC | LIMIT 10", "SEGMENT", true),
new Case("| WHERE a:\"3\" | WHERE a:\"4\" | SORT _score DESC | LIMIT 10", "SEGMENT", true), }) {
params.add(
new Object[] {
defaultDataPartitioning,
index,
"FROM " + index + (c.score ? " METADATA _score " : " ") + c.suffix,
expectedPartition(defaultDataPartitioning, index, c.idxPartition) }
expectedPartition(defaultDataPartitioning, index, c.idxPartition, c.score) }
);
}
}
Expand Down Expand Up @@ -132,15 +133,19 @@ private void setDefaultDataPartitioning(String defaultDataPartitioning) throws I
assertThat(code, equalTo(200));
}

private static Matcher<String> expectedPartition(String defaultDataPartitioning, String index, String idxPartition) {
private static Matcher<String> expectedPartition(String defaultDataPartitioning, String index, String idxPartition, boolean score) {
return switch (defaultDataPartitioning) {
case null -> expectedAutoPartition(index, idxPartition);
case "auto" -> expectedAutoPartition(index, idxPartition);
case null -> expectedAutoPartition(index, idxPartition, score);
case "auto" -> expectedAutoPartition(index, idxPartition, score);
default -> equalTo(defaultDataPartitioning.toUpperCase(Locale.ROOT));
};
}

private static Matcher<String> expectedAutoPartition(String index, String idxPartition) {
private static Matcher<String> expectedAutoPartition(String index, String idxPartition, boolean score) {
boolean lateMaterializationEnabled = EsqlCapabilities.Cap.ENABLE_REDUCE_NODE_LATE_MATERIALIZATION.isEnabled();
if (score && lateMaterializationEnabled == false) {
return equalTo("SHARD");
}
return equalTo(switch (index) {
case "idx" -> idxPartition;
case "small_idx" -> "SHARD";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.compute.aggregation.GroupingAggregator;
import org.elasticsearch.compute.aggregation.blockhash.BlockHash;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.lucene.DataPartitioning;
import org.elasticsearch.compute.lucene.IndexedByShardId;
import org.elasticsearch.compute.lucene.LuceneCountOperator;
import org.elasticsearch.compute.lucene.LuceneOperator;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.elasticsearch.search.lookup.SourceFilter;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.xpack.esql.action.EsqlCapabilities;
import org.elasticsearch.xpack.esql.core.expression.Attribute;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
Expand Down Expand Up @@ -333,12 +335,14 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
* references to the same underlying data, but we're being a bit paranoid here.
*/
estimatedPerRowSortSize *= 2;

// LuceneTopNSourceOperator does not support QueryAndTags, if there are multiple queries or if the single query has tags,
// UnsupportedOperationException will be thrown by esQueryExec.query()
luceneFactory = new LuceneTopNSourceOperator.Factory(
shardContexts,
querySupplier(esQueryExec.query()),
context.queryPragmas().dataPartitioning(plannerSettings.defaultDataPartitioning()),
topNAutoStrategy(),
context.queryPragmas().taskConcurrency(),
context.pageSize(esQueryExec, rowEstimatedSize),
limit,
Expand Down Expand Up @@ -373,6 +377,16 @@ public final PhysicalOperation sourcePhysicalOperation(EsQueryExec esQueryExec,
return PhysicalOperation.fromSource(luceneFactory, layout.build());
}

private static DataPartitioning.AutoStrategy topNAutoStrategy() {
return unusedLimit -> {
if (EsqlCapabilities.Cap.ENABLE_REDUCE_NODE_LATE_MATERIALIZATION.isEnabled()) {
// Use high speed strategy for TopN - we want to parallelize searches as much as possible given the query structure
return LuceneSourceOperator::highSpeedAutoStrategy;
}
return query -> LuceneSliceQueue.PartitioningStrategy.SHARD;
};
}

List<ValuesSourceReaderOperator.FieldInfo> extractFields(FieldExtractExec fieldExtractExec) {
List<Attribute> attributes = fieldExtractExec.attributesToExtract();
List<ValuesSourceReaderOperator.FieldInfo> fieldInfos = new ArrayList<>(attributes.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public SumOverTimeTests(@Name("TestCase") Supplier<TestCaseSupplier.TestCase> te

@ParametersFactory
public static Iterable<Object[]> parameters() {
return SumTests.parameters();
return SumTests.testParameters(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public SumTests(@Name("TestCase") Supplier<TestCaseSupplier.TestCase> testCaseSu

@ParametersFactory
public static Iterable<Object[]> parameters() {
return testParameters(true);
}

static Iterable<Object[]> testParameters(boolean includeDenseVector) {
var suppliers = new ArrayList<TestCaseSupplier>();

Stream.of(
Expand Down Expand Up @@ -80,25 +84,28 @@ public static Iterable<Object[]> parameters() {
DataType.DOUBLE,
equalTo(200.)
)
),
new TestCaseSupplier(List.of(DataType.AGGREGATE_METRIC_DOUBLE), () -> {
var value = new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(
randomDouble(),
randomDouble(),
randomDouble(),
randomNonNegativeInt()
);
return new TestCaseSupplier.TestCase(
List.of(TestCaseSupplier.TypedData.multiRow(List.of(value), DataType.AGGREGATE_METRIC_DOUBLE, "field")),
standardAggregatorName("Sum", DataType.AGGREGATE_METRIC_DOUBLE),
DataType.DOUBLE,
equalTo(value.sum())
);

})
)
)
);

if (includeDenseVector) {
suppliers.add(new TestCaseSupplier(List.of(DataType.AGGREGATE_METRIC_DOUBLE), () -> {
var value = new AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral(
randomDouble(),
randomDouble(),
randomDouble(),
randomNonNegativeInt()
);
return new TestCaseSupplier.TestCase(
List.of(TestCaseSupplier.TypedData.multiRow(List.of(value), DataType.AGGREGATE_METRIC_DOUBLE, "field")),
standardAggregatorName("Sum", DataType.AGGREGATE_METRIC_DOUBLE),
DataType.DOUBLE,
equalTo(value.sum())
);

}));
}

return parameterSuppliersFromTypedDataWithDefaultChecks(suppliers);
}

Expand Down