[ESQL] Adds wriring for compute side of LIMIT BY command#143458
[ESQL] Adds wriring for compute side of LIMIT BY command#143458ncordon merged 17 commits intoelastic:mainfrom
Conversation
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
There was a problem hiding this comment.
Pull request overview
Adds compute-engine support for the ESQL LIMIT BY plan shape by threading grouping expressions through logical/physical Limit nodes and introducing a new compute operator to enforce per-group row limits, including transport-version gated serialization.
Changes:
- Extend
Limit(logical) andLimitExec(physical) withgroupingsand add mixed-version serialization behavior forLIMIT BY. - Wire planning/mapping to pass groupings through and execute
GroupedLimitOperatorwhen groupings are present. - Introduce
GroupedLimitOperator+PositionKeyEncoderwith accompanying unit tests and register the new operator status writeable.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/LimitExecSerializationTests.java | Updates physical plan serialization tests for new groupings field + mixed-version behavior. |
| x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/logical/LimitSerializationTests.java | Updates logical plan serialization tests for new groupings field + mixed-version behavior. |
| x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java | Registers GroupedLimitOperator.Status in named writeables. |
| x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/Mapper.java | Passes Limit.groupings() into LimitExec. |
| x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/mapper/LocalMapper.java | Passes Limit.groupings() into LimitExec for local planning. |
| x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java | Plans LimitExec into LimitOperator vs GroupedLimitOperator based on groupings. |
| x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LimitExec.java | Adds groupings to physical node and gates serialization by transport version. |
| x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java | Adds groupings to logical node and gates serialization by transport version. |
| x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/PositionKeyEncoderTests.java | Adds coverage for composite key encoding semantics. |
| x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/GroupedLimitOperatorTests.java | Adds behavioral tests for grouped limiting and status fields. |
| x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/GroupedLimitOperatorStatusTests.java | Adds wire/xcontent tests for GroupedLimitOperator.Status. |
| x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/PositionKeyEncoder.java | Implements grouping-key encoding across channels/values. |
| x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/GroupedLimitOperator.java | Implements per-group limiting operator + status. |
| server/src/main/resources/transport/upper_bounds/9.4.csv | Updates the 9.4 transport upper bound to include esql_limit_by. |
| server/src/main/resources/transport/definitions/referable/esql_limit_by.csv | Adds referable transport version definition for esql_limit_by. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...ugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/GroupedLimitOperator.java
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/LimitExec.java
Outdated
Show resolved
Hide resolved
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/Limit.java
Outdated
Show resolved
Hide resolved
...ugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/GroupedLimitOperator.java
Outdated
Show resolved
Hide resolved
| private final int limitPerGroup; | ||
| private final PositionKeyEncoder keyEncoder; | ||
| private final BigArrays bigArrays; | ||
| private final BytesRefHashTable seenKeys; |
There was a problem hiding this comment.
It's worth a comment saying that you want this and not the a BlockHash implementation because those expand multivalues and this doesn't. I could see a world where one day we have other implementations of BlockHash that don't expand keys, but today is not that day. This is a good way to do it.
There was a problem hiding this comment.
Could you make the comment javadoc? It'll read nicer if you mouseover the item.
...ugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/GroupedLimitOperator.java
Outdated
Show resolved
Hide resolved
| * below 128 are encoded in a single byte, making this compact for the | ||
| * small numbers typical of value counts and byte-array lengths. | ||
| */ | ||
| private void writeVInt(int value) { |
There was a problem hiding this comment.
It would be nice to be able to share logic with DefaultUnsortableTopNEncoder in these methods, as they do basically the same
There was a problem hiding this comment.
We could have a TopNEncoder.DEFAULT_UNSORTABLE inside this class? I've done that here: d2bd9f3. Not sure if I like we are calling inside. Any thoughts?
...ugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/GroupedLimitOperator.java
Outdated
Show resolved
Hide resolved
...ugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/GroupedLimitOperator.java
Show resolved
Hide resolved
...esql/compute/src/test/java/org/elasticsearch/compute/operator/GroupedLimitOperatorTests.java
Show resolved
Hide resolved
...ugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/GroupedLimitOperator.java
Show resolved
Hide resolved
| BytesRef key = keyEncoder.encode(page, pos); | ||
| long hashOrd = seenKeys.add(key); | ||
| int count; | ||
| long ord; |
There was a problem hiding this comment.
No need to change anything but TIL BytesRefHashTable.add returns >= 0 when a new key was added and (-1-id) which is < 0 when it was already present.
There was a problem hiding this comment.
I found that quite clever when I read about it 😄
| private final int limitPerGroup; | ||
| private final PositionKeyEncoder keyEncoder; | ||
| private final BigArrays bigArrays; | ||
| private final BytesRefHashTable seenKeys; |
There was a problem hiding this comment.
Could you make the comment javadoc? It'll read nicer if you mouseover the item.
...ugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/GroupedLimitOperator.java
Show resolved
Hide resolved
| this.counts = bigArrays.newIntArray(16, false); | ||
| } | ||
|
|
||
| public static final class Factory implements Operator.OperatorFactory { |
There was a problem hiding this comment.
Could you move the Factory up to the top? I get confused when the factory is below the ctor. Mostly when I'm scanning the files quickly.
| } | ||
| } | ||
| return result; | ||
| } |
There was a problem hiding this comment.
I see we do it like 40 times in other places. It'd be lovely have it in one. But it'd be nice to dedupe in a follow-up change.
There was a problem hiding this comment.
Yep, Ivan pointed to me we already have a filter method in Page
| @Override | ||
| public void finish() { | ||
| finished = true; | ||
| } |
There was a problem hiding this comment.
For later: there are cases where we know the hash can never grow any larger - like, we know the number of unique values we're grouping on. In that case we could early terminate - just like LimitOperator does. Say you are doing:
FROM foo
| SORT @timestamp DESC
| LIMIT 10 BY hostname
It's quite possible to know the number of unique hostnames, especially if it's less than a million. The old aggs framework uses a thing called "global ordinals" for this. We don't want those because they are very expensive at unexpected times. But their existence proves that such counts are possible/easy/good.
That is for later though.
There was a problem hiding this comment.
👍 I've added it to the issue we have tracking things to polish LIMIT BY: https://github.com/elastic/esql-planning/issues/262
|
I've addressed all of the feedback, just missing an opinion on this #143458 (comment) from @ivancea |
| public class GroupKeyEncoder { | ||
| public class GroupKeyEncoder implements Releasable { | ||
|
|
||
| private static final DefaultUnsortableTopNEncoder encoder = TopNEncoder.DEFAULT_UNSORTABLE; |
There was a problem hiding this comment.
Not sure whether I like this
There was a problem hiding this comment.
Looks good to me, and we reuse some code. Plus, having the concrete class declared there should help with inlining.
Is there something you don't like specifically?
There was a problem hiding this comment.
The naming perhaps (we are calling something from TopN inside limit), but it's fine, I can live with it
There was a problem hiding this comment.
Uhm yep, the namespacing seems odd, but I would live with it. Ideally, if we decide to change to a BlockHash or similar, that would be "fixed" (Fun, because BlockHash is in the aggregations package, so not perfect either! 😆)
...esql/compute/src/test/java/org/elasticsearch/compute/operator/GroupedLimitOperatorTests.java
Show resolved
Hide resolved
...esql/compute/src/test/java/org/elasticsearch/compute/operator/GroupedLimitOperatorTests.java
Outdated
Show resolved
Hide resolved
...esql/compute/src/test/java/org/elasticsearch/compute/operator/GroupedLimitOperatorTests.java
Show resolved
Hide resolved
| public class GroupKeyEncoder { | ||
| public class GroupKeyEncoder implements Releasable { | ||
|
|
||
| private static final DefaultUnsortableTopNEncoder encoder = TopNEncoder.DEFAULT_UNSORTABLE; |
There was a problem hiding this comment.
Looks good to me, and we reuse some code. Plus, having the concrete class declared there should help with inlining.
Is there something you don't like specifically?
Adds a compute engine operator
GroupedLimitOperator, for a newLIMIT BYESQL command.LIMIT N BY expr1, expr2,...retains at most N documents per group, where groups are defined by one or more grouping key expressions.This should be a no-op because it's not used anywhere in the language yet.
Part of #112918, https://github.com/elastic/esql-planning/issues/238