Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8251782
ESQL: Added GroupedTopNOperator for LIMIT BY, compute only
ivancea Mar 3, 2026
467ffbb
Format
ivancea Mar 3, 2026
75f84c7
Fixes failing tests
ncordon Mar 4, 2026
ee45151
Shares more code
ncordon Mar 4, 2026
93cd5c6
Merge branch 'main' into grouped-topn-operator
ncordon Mar 4, 2026
21afee5
Merge branch 'main' into grouped-topn-operator
ncordon Mar 4, 2026
b1d2469
Merge branch 'main' into grouped-topn-operator
ncordon Mar 5, 2026
b694a8c
Simplified grouped topn test
ivancea Mar 5, 2026
aa96d78
Make static test methods non-static
ivancea Mar 5, 2026
e4d7e11
Merge branch 'main' into grouped-topn-operator
ivancea Mar 5, 2026
905b80a
Simplified tests removing redundant methods
ivancea Mar 5, 2026
7c57e43
Reorder private method
ivancea Mar 5, 2026
e453d22
Merge branch 'main' into grouped-topn-operator
ivancea Mar 5, 2026
0752772
Merge branch 'main' into grouped-topn-operator
ivancea Mar 6, 2026
ed995a0
Javadoc and renames, and removed outdated test
ivancea Mar 6, 2026
f5f96fb
Rename test operator
ivancea Mar 6, 2026
bce5c45
[CI] Auto commit changes from spotless
Mar 6, 2026
476e635
Fix double closing and simplify GroupedQueue tests
ivancea Mar 6, 2026
7af7541
Changed status groupCount to int and fixed TopNRow hashCode
ivancea Mar 9, 2026
2055ff8
Merge branch 'main' into grouped-topn-operator
ivancea Mar 9, 2026
b727c7f
Changes PositionKeyEncoder by GroupKeyEncoder
ncordon Mar 10, 2026
a7649ad
Removed redundant groupKeys to simplify double accounting
ivancea Mar 10, 2026
d4b993e
Merge branch 'main' into grouped-topn-operator
ivancea Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public static void closeExpectNoException(Releasable... releasables) {
}

/** Release the provided {@link Releasable} expecting no exception to by thrown. */
public static void closeExpectNoException(Releasable releasable) {
public static void closeExpectNoException(@Nullable Releasable releasable) {
try {
close(releasable);
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,11 @@ public static long randomLong() {
return random().nextLong();
}

/** A random long from 0..max (inclusive). */
public static long randomLong(long max) {
Comment thread
ivancea marked this conversation as resolved.
return RandomNumbers.randomLongBetween(random(), 0L, max);
}

public static LongStream randomLongs() {
return random().longs();
}
Expand Down

@ivancea ivancea Mar 5, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be removed when #143458 is merged (Renamed to GroupKeyEncoder)

Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.DoubleBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.FloatBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;

import java.util.List;

/**
* Encodes the values at a given position across multiple blocks into a single {@link BytesRef} composite key.
* Multivalued positions are serialized with list semantics: the value count is written first, then each value
* in block iteration order. This means {@code [1, 2]} and {@code [2, 1]} produce different keys.
* Null positions are encoded as a value count of zero.
*/
public class PositionKeyEncoder implements Accountable {
Comment thread
ivancea marked this conversation as resolved.
Outdated

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(PositionKeyEncoder.class);

private final int[] groupChannels;
private final ElementType[] elementTypes;
private final BytesRefBuilder scratch = new BytesRefBuilder();
private final BytesRef scratchBytesRef = new BytesRef();

public PositionKeyEncoder(int[] groupChannels, List<ElementType> elementTypes) {
this.groupChannels = groupChannels;
this.elementTypes = new ElementType[groupChannels.length];
for (int i = 0; i < groupChannels.length; i++) {
this.elementTypes[i] = elementTypes.get(groupChannels[i]);
}
}

/**
* Encode the group key for the given position from the page into a {@link BytesRef}.
* The returned reference is only valid until the next call to {@code encode}.
*/
public BytesRef encode(Page page, int position) {
scratch.clear();
for (int i = 0; i < groupChannels.length; i++) {
Block block = page.getBlock(groupChannels[i]);
encodeBlock(block, elementTypes[i], position);
}
return scratch.get();
}

private void encodeBlock(Block block, ElementType type, int position) {
if (block.isNull(position)) {
writeVInt(0);
return;
}
int firstValueIndex = block.getFirstValueIndex(position);
int valueCount = block.getValueCount(position);
writeVInt(valueCount);
switch (type) {
case INT -> {
IntBlock b = (IntBlock) block;
for (int v = 0; v < valueCount; v++) {
writeInt(b.getInt(firstValueIndex + v));
}
}
case LONG -> {
LongBlock b = (LongBlock) block;
for (int v = 0; v < valueCount; v++) {
writeLong(b.getLong(firstValueIndex + v));
}
}
case DOUBLE -> {
DoubleBlock b = (DoubleBlock) block;
for (int v = 0; v < valueCount; v++) {
writeLong(Double.doubleToLongBits(b.getDouble(firstValueIndex + v)));
}
}
case FLOAT -> {
FloatBlock b = (FloatBlock) block;
for (int v = 0; v < valueCount; v++) {
writeInt(Float.floatToIntBits(b.getFloat(firstValueIndex + v)));
}
}
case BOOLEAN -> {
BooleanBlock b = (BooleanBlock) block;
for (int v = 0; v < valueCount; v++) {
scratch.append((byte) (b.getBoolean(firstValueIndex + v) ? 1 : 0));
}
}
case BYTES_REF -> {
BytesRefBlock b = (BytesRefBlock) block;
for (int v = 0; v < valueCount; v++) {
BytesRef ref = b.getBytesRef(firstValueIndex + v, scratchBytesRef);
writeVInt(ref.length);
scratch.append(ref.bytes, ref.offset, ref.length);
}
}
case NULL -> {
// already handled by isNull above; nothing extra to write
}
default -> throw new IllegalArgumentException("unsupported element type for group key encoding: " + type);
}
}

private void writeVInt(int value) {
while ((value & ~0x7F) != 0) {
scratch.append((byte) ((value & 0x7F) | 0x80));
value >>>= 7;
}
scratch.append((byte) value);
}

private void writeInt(int value) {
scratch.append((byte) (value >> 24));
scratch.append((byte) (value >> 16));
scratch.append((byte) (value >> 8));
scratch.append((byte) value);
}

private void writeLong(long value) {
writeInt((int) (value >> 32));
writeInt((int) value);
}

@Override
public long ramBytesUsed() {
long size = SHALLOW_SIZE;
size += RamUsageEstimator.sizeOf(groupChannels);

@ncordon ncordon Mar 10, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we were double accounting for this here and in the GroupedTopNOperator, I've changed it when I've renamed PositionKeyEncoder to GroupKeyEncoder

size += RamUsageEstimator.shallowSizeOf(elementTypes);
size += RamUsageEstimator.shallowSizeOfInstance(BytesRefBuilder.class);
size += RamUsageEstimator.sizeOf(scratch.bytes());
size += RamUsageEstimator.shallowSizeOfInstance(BytesRef.class);
return size;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.topn;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.PriorityQueue;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ObjectArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;

import java.util.ArrayList;
import java.util.List;

import static org.apache.lucene.util.RamUsageEstimator.shallowSizeOfInstance;

/**
* A queue that maintains a separate per-group priority queue, indexed by integer group IDs
* assigned by a {@link org.elasticsearch.compute.aggregation.blockhash.BlockHash}.
Comment thread
ivancea marked this conversation as resolved.
Outdated
* Uses a {@link BigArrays}-backed {@link ObjectArray} for better performance and circuit
* breaker integration.
*/
class GroupedQueue implements Accountable, Releasable {
private static final long SHALLOW_SIZE = shallowSizeOfInstance(GroupedQueue.class);

private final CircuitBreaker breaker;
private final BigArrays bigArrays;
private final int topCount;
private ObjectArray<PerGroupQueue> queues;

GroupedQueue(CircuitBreaker breaker, BigArrays bigArrays, int topCount) {
this.breaker = breaker;
this.bigArrays = bigArrays;
this.topCount = topCount;
this.queues = bigArrays.newObjectArray(0);
}

@Override
public String toString() {
return size() + "/" + queues.size() + "/" + topCount;
Comment thread
ivancea marked this conversation as resolved.
}

int size() {
int totalSize = 0;
Comment thread
ivancea marked this conversation as resolved.
for (long i = 0; i < queues.size(); i++) {
PerGroupQueue queue = queues.get(i);
if (queue != null) {
totalSize += queue.size();
}
}
return totalSize;
}

/**
* Attempts to add the row to the appropriate per-group queue based on {@link GroupedRow#groupId}.
* @return If the row was added and the queue was full, the evicted row.
* If the row was added and it wasn't full, {@code null}.
* If the row wasn't added, the input row.
*/
GroupedRow addRow(GroupedRow row) {
return getOrCreateQueue(row.groupId).addRow(row);
}

private PerGroupQueue getOrCreateQueue(long groupId) {
if (groupId >= queues.size()) {
queues = bigArrays.grow(queues, groupId + 1);
Comment thread
ivancea marked this conversation as resolved.
}
PerGroupQueue queue = queues.get(groupId);
if (queue == null) {
queue = PerGroupQueue.build(breaker, topCount);
queues.set(groupId, queue);
}
return queue;
}

/**
* Removes and returns all rows from all per-group queues.
* For an ascending order, the first element will be the min element (or last in the
* priority queue), and vice versa.
*/
List<GroupedRow> popAll() {
List<GroupedRow> allRows = new ArrayList<>(size());
for (long i = 0; i < queues.size(); i++) {
PerGroupQueue queue = queues.get(i);
if (queue != null) {
queue.popAllInto(allRows);
queue.close();
queues.set(i, null);
}
}
allRows.sort((r1, r2) -> -r1.compareTo(r2));
Comment thread
ivancea marked this conversation as resolved.
return allRows;
}

@Override
public long ramBytesUsed() {
long total = SHALLOW_SIZE;
if (queues != null) {
total += queues.ramBytesUsed();
for (long i = 0; i < queues.size(); i++) {
PerGroupQueue queue = queues.get(i);
if (queue != null) {
total += queue.ramBytesUsed();
}
}
}
return total;
}

@Override
public void close() {
Releasables.close(() -> {
if (queues != null) {
for (long i = 0; i < queues.size(); i++) {
PerGroupQueue queue = queues.get(i);
if (queue != null) {
queue.close();
queues.set(i, null);
}
}
}
}, queues);
}

/**
* A single-group priority queue backed by Lucene's PriorityQueue.
*/
static final class PerGroupQueue extends PriorityQueue<GroupedRow> implements Accountable, Releasable {
Comment thread
ivancea marked this conversation as resolved.
Outdated
private static final long SHALLOW_SIZE = shallowSizeOfInstance(PerGroupQueue.class);

private final CircuitBreaker breaker;
private final int topCount;

private PerGroupQueue(CircuitBreaker breaker, int topCount) {
super(topCount);
this.topCount = topCount;
this.breaker = breaker;
}

static PerGroupQueue build(CircuitBreaker breaker, int topCount) {
breaker.addEstimateBytesAndMaybeBreak(sizeOf(topCount), "topn");
return new PerGroupQueue(breaker, topCount);
}

@Override
protected boolean lessThan(GroupedRow lhs, GroupedRow rhs) {
return lhs.compareTo(rhs) < 0;
}

GroupedRow addRow(GroupedRow row) {
if (size() < topCount) {
add(row);
return null;
} else if (lessThan(top(), row)) {
GroupedRow evicted = top();
updateTop(row);
return evicted;
}
return row;
}

void popAllInto(List<GroupedRow> target) {
while (size() > 0) {
target.add(pop());
}
}

@Override
public long ramBytesUsed() {
long total = SHALLOW_SIZE;
total += RamUsageEstimator.alignObjectSize(
RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + RamUsageEstimator.NUM_BYTES_OBJECT_REF * ((long) topCount + 1)
);
for (GroupedRow r : this) {
total += r == null ? 0 : r.ramBytesUsed();
}
return total;
}

@Override
public void close() {
Releasables.close(() -> {
var heapArray = getHeapArray();
for (int i = 0; i < heapArray.length; i++) {
GroupedRow row = (GroupedRow) heapArray[i];
if (row != null) {
row.close();
heapArray[i] = null;
}
}
}, () -> breaker.addWithoutBreaking(-sizeOf(topCount)));
}

private static long sizeOf(int topCount) {
long total = SHALLOW_SIZE;
total += RamUsageEstimator.alignObjectSize(
RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + RamUsageEstimator.NUM_BYTES_OBJECT_REF * (topCount + 1L)
);
return total;
}
}
}
Loading