ESQL: Added GroupedTopNOperator for LIMIT BY, compute only#143476
ESQL: Added GroupedTopNOperator for LIMIT BY, compute only#143476ivancea merged 23 commits intoelastic:mainfrom
Conversation
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
There was a problem hiding this comment.
Pull request overview
Adds compute-layer support for grouped Top-N (to back ESQL LIMIT … BY) by introducing a new GroupedTopNOperator and the supporting key-encoding and queueing machinery, along with substantial test refactors/additions to validate grouped behavior.
Changes:
- Introduce
GroupedTopNOperator+GroupedQueue/GroupedRowto maintain per-group Top-N results in compute. - Add
PositionKeyEncoderfor stable composite group-key encoding (including MV “list semantics”). - Refactor/extend TopN tests and test block sources to support both grouped and ungrouped Top-N, and add new grouped-specific tests and status serialization tests.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/GroupedTopNOperator.java | New grouped Top-N operator implementation and factory. |
| x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/GroupedQueue.java | Per-group priority queue management backed by BigArrays. |
| x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/GroupedRow.java | Encoded row container with breaker accounting and shard ref-count tracking. |
| x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/GroupedRowFiller.java | Extract/encode sort keys + values from pages into GroupedRow. |
| x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/GroupedTopNOperatorStatus.java | New operator status with group_count and XContent/wire support. |
| x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PositionKeyEncoder.java | Composite group-key encoder used to map group keys to group IDs. |
| x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java | Refactors to support grouped/un-grouped variants and adds helpers/utilities. |
| x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/GroupedTopNOperatorTests.java | New grouped-specific test suite extending TopN tests. |
| x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/GroupedTopNOperatorStatusTests.java | Wire/XContent tests for grouped status. |
| x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/GroupedRowTests.java | Memory accounting and close semantics tests for grouped rows. |
| x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/GroupedQueueTests.java | Behavioral + memory usage tests for grouped queue logic. |
| x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TypedAbstractBlockSourceBuilder.java | Test-only base class to expose elementTypes() from block source operators. |
| x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/operator/blocksource/TupleAbstractBlockSourceOperator.java | Updated to expose element types via new typed base class. |
| x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/operator/blocksource/ListRowsBlockSourceOperator.java | Updated to expose element types and allow overriding block builder creation. |
| x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/TestBlockBuilder.java | Adds DocBlockBuilder for doc blocks with shard ref counters in tests. |
| test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java | Adds randomLong(long max) helper for tests. |
| libs/core/src/main/java/org/elasticsearch/core/Releasables.java | Allows nullable argument for closeExpectNoException(Releasable). |
Comments suppressed due to low confidence (2)
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java:1708
- In the output-draining loop, the page blocks are only released after
readAsRows(...)completes. If an assertion insidereadAsRows(or any other exception) is thrown, the page’s blocks won’t be released, which can cascade into breaker/memory-leak failures that obscure the original test failure. Prefer using try-with-resources (or a try/finally) around thePagereturned bygetOutput()to guaranteeclose()/releaseBlocks()runs even on failure.
while (operator.isFinished() == false) {
Page p = operator.getOutput();
assertThat(operator.needsInput(), equalTo(false));
if (p != null) {
readAsRows(actualValues, p);
p.releaseBlocks();
}
x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/GroupedTopNOperatorTests.java:610
computeTopN(...)currently instantiates a newGroupedTopNOperatorTestsjust to call an instance helper. Constructing a new randomized test case instance during a test can consume random numbers unexpectedly and bypass the normal JUnit/RandomizedRunner lifecycle, which can lead to subtle flakiness. Consider extracting the grouping/sorting helper into a static method (or a small utility) so it can be called directly without creating a new test instance.
private static List<? extends List<?>> computeTopN(
List<? extends List<?>> inputValues,
int groupChannel,
int sortChannel,
int limit,
boolean ascendingOrder
) {
List<List<Object>> singleValueInput = new ArrayList<>();
for (List<?> row : inputValues) {
List<Object> rowAsObject = row.stream().map(v -> (Object) v).toList();
singleValueInput.add(rowAsObject);
}
List<SortOrder> sortOrders = List.of(new SortOrder(sortChannel, ascendingOrder, false));
return new GroupedTopNOperatorTests().computeTopN(singleValueInput, List.of(groupChannel), sortOrders, limit);
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ncordon
left a comment
There was a problem hiding this comment.
It looks good to me overall, the thing I'd like us to consider is whether we can avoid some more code duplication
.../plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/GroupedQueue.java
Outdated
Show resolved
Hide resolved
.../plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/GroupedQueue.java
Show resolved
Hide resolved
.../plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/GroupedQueue.java
Show resolved
Hide resolved
.../plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/GroupedQueue.java
Show resolved
Hide resolved
.../plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/GroupedQueue.java
Show resolved
Hide resolved
...ck/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/GroupedRow.java
Show resolved
Hide resolved
...ck/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/GroupedRow.java
Outdated
Show resolved
Hide resolved
...gin/esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/GroupedRowFiller.java
Outdated
Show resolved
Hide resolved
...plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PositionKeyEncoder.java
Outdated
Show resolved
Hide resolved
...in/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/GroupedQueueTests.java
Outdated
Show resolved
Hide resolved
83fdcf2 to
980e633
Compare
980e633 to
ee45151
Compare
There was a problem hiding this comment.
To be removed when #143458 is merged (Renamed to GroupKeyEncoder)
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hashCode(keys); |
There was a problem hiding this comment.
equals compares keys.bytesRefView() content, but hashCode() uses keys (the builder object), which can violate the equals/hashCode contract (equal rows may have different hash codes). Compute the hash from the same bytes used by equals, e.g. keys.bytesRefView().hashCode() (or an equivalent content-based hash).
| return Objects.hashCode(keys); | |
| return keys.bytesRefView().hashCode(); |
There was a problem hiding this comment.
@nik9000 this was like this in the original TopN.Row (keys is a BreakingBytesRefBuilder, which doesn't implement equals() nor hashCode()):
Is it a bug, or there was a reason for that?
I can change it, as I don't think we use equality anyway (Maybe in tests, if something)
There was a problem hiding this comment.
It's a bug, yeah. We use equals but not hashCode so we didn't notice it.
.../esql/compute/src/main/java/org/elasticsearch/compute/operator/topn/GroupedTopNOperator.java
Show resolved
Hide resolved
...mpute/test/src/main/java/org/elasticsearch/compute/test/TypedAbstractBlockSourceBuilder.java
Outdated
Show resolved
Hide resolved
nik9000
left a comment
There was a problem hiding this comment.
Left a couple of small things. Looks good though.
| private final long receiveNanos; | ||
| private final long emitNanos; | ||
| private final int occupiedRows; | ||
| private final long groupCount; |
There was a problem hiding this comment.
This can only be an int I think. I think the long type comes from the array thingy, but we're only using the int part.
There was a problem hiding this comment.
Realistically yes. But it comes as a long from the hashTable.size(). Furthermore, blockhashes return long ids too.
So I'm a bit conflicted. I would prefer having this as a long playing along with the rest of the components, instead of explicitly casting it to int so it matches the operator practical limits.
I'm not sure which piece would fail first if we reach +MAX_INT elements, but I would keep those extra bytes just in case
There was a problem hiding this comment.
I suppose it's fine. We're not really in a place to have all those elements in memory at the moment. We'd CircuitBreak long before we get there. But one day we'll push to disk. And, in that case, we might want long here.
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hashCode(keys); |
There was a problem hiding this comment.
It's a bug, yeah. We use equals but not hashCode so we didn't notice it.
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hashCode(keys); |
| if (o instanceof BytesRefHashTable h) { | ||
| return h.ramBytesUsed(); | ||
| } | ||
| return super.accumulateObject(o, shallowSize, fieldValues, queue); |
There was a problem hiding this comment.
I think this probably needs to be returned by a function. It's not immutable so making it shared like this is bound to be weird.
There was a problem hiding this comment.
Are you talking about the Accumulator anonymous class, or this condition?
If it's the class, it's stateless (the parent and this one). As the parent doesn't depend on us, I'll change it to a method anyway as a safety net
There was a problem hiding this comment.
Ignore my comment. It is immutable. I thought it wasn't because it's named Accumulator - which, to me, means "thing you accumulate into" not "how you accumulate into my parameters"
| if (o instanceof BytesRefHashTable h) { | ||
| return h.ramBytesUsed(); | ||
| } | ||
| return super.accumulateObject(o, shallowSize, fieldValues, queue); |
There was a problem hiding this comment.
Ignore my comment. It is immutable. I thought it wasn't because it's named Accumulator - which, to me, means "thing you accumulate into" not "how you accumulate into my parameters"
# Conflicts: # test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java # x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java
| @Override | ||
| public long ramBytesUsed() { | ||
| long size = SHALLOW_SIZE; | ||
| size += RamUsageEstimator.sizeOf(groupChannels); |
There was a problem hiding this comment.
I think we were double accounting for this here and in the GroupedTopNOperator, I've changed it when I've renamed PositionKeyEncoder to GroupKeyEncoder
…locations * upstream/main: (126 commits) Update KnnIndexTester to use more settings from datasets (elastic#143869) fix: dynamic template vector array is overridden by automatic dense_vector mapping (elastic#143733) ES|QL: Don't reuse the same alias for _fork column (elastic#143909) Close and initialize clients after each node upgrade in logsdb rolling upgrade tests. (elastic#143823) ESQL: Added GroupedTopNOperator for LIMIT BY, compute only (elastic#143476) Handle views in ResolveIndexAction (elastic#143561) Improve reindex rethrottle API in stateless (elastic#143771) Use a copy of the SearchExecutionContext for each Percolator execution (elastic#142765) Log the stacktrace when we encounter a deprecation warning for `default_metric` (elastic#143929) ESQL: evaluate ReferenceAttributes to potentially FieldAttributes for full-text functions restriction (elastic#143893) Add ClusterStateSerializationStats Serializatation Tests (elastic#142703) Adds Coordination Diagnostics Tests (elastic#142709) Upgrade Elasticsearch to Apache Lucene 10.4 (elastic#141882) ESQL: Add configurable bracket-based multi-value support for CSV reader (elastic#143890) time series es819 binary dv use up to a 1mb block size (elastic#143049) Dynamically enable / disable plugins in correspondence to stateless mode. (elastic#142147) ES|QL: Implement first/last_over_time for tdigest (elastic#143832) Document CHANGE_POINT limitation (elastic#143877) Fix OperationsOnSeqNoDisabledIndicesIT (elastic#143892) [Test] Test that sequence numbers are not pruned with retention lease (elastic#143825) ...
Operator added in #143476 Similar structure as TopNBenchmark, with similar parameters to keep the comparison consistent. Benchmark results for some similar cases (Compared with TopN): ``` TopNBenchmark Benchmark (data) (sortedInput) (topCount) Mode Cnt Score Error Units TopNBenchmark.run longs_asc_and_bytes_refs_asc false 10 avgt 7 56.292 ± 0.922 ns/op TopNBenchmark.run longs_asc_and_bytes_refs_asc false 1000 avgt 7 54.866 ± 1.091 ns/op GroupedTopNBenchmark Benchmark (data) (groupCount) (groupKeys) (topCount) Mode Cnt Score Error Units GroupedTopNBenchmark.run longs_asc_and_bytes_refs_asc 1 longs 10 avgt 7 103.886 ± 10.041 ns/op GroupedTopNBenchmark.run longs_asc_and_bytes_refs_asc 1 longs 1000 avgt 7 103.321 ± 5.021 ns/op GroupedTopNBenchmark.run longs_asc_and_bytes_refs_asc 100 longs 10 avgt 7 118.975 ± 7.685 ns/op GroupedTopNBenchmark.run longs_asc_and_bytes_refs_asc 100 longs 1000 avgt 7 205.589 ± 30.058 ns/op ```
Operator added in elastic#143476 Similar structure as TopNBenchmark, with similar parameters to keep the comparison consistent. Benchmark results for some similar cases (Compared with TopN): ``` TopNBenchmark Benchmark (data) (sortedInput) (topCount) Mode Cnt Score Error Units TopNBenchmark.run longs_asc_and_bytes_refs_asc false 10 avgt 7 56.292 ± 0.922 ns/op TopNBenchmark.run longs_asc_and_bytes_refs_asc false 1000 avgt 7 54.866 ± 1.091 ns/op GroupedTopNBenchmark Benchmark (data) (groupCount) (groupKeys) (topCount) Mode Cnt Score Error Units GroupedTopNBenchmark.run longs_asc_and_bytes_refs_asc 1 longs 10 avgt 7 103.886 ± 10.041 ns/op GroupedTopNBenchmark.run longs_asc_and_bytes_refs_asc 1 longs 1000 avgt 7 103.321 ± 5.021 ns/op GroupedTopNBenchmark.run longs_asc_and_bytes_refs_asc 100 longs 10 avgt 7 118.975 ± 7.685 ns/op GroupedTopNBenchmark.run longs_asc_and_bytes_refs_asc 100 longs 1000 avgt 7 205.589 ± 30.058 ns/op ```
Extracted from #140019