Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
8251782
ESQL: Added GroupedTopNOperator for LIMIT BY, compute only
ivancea Mar 3, 2026
467ffbb
Format
ivancea Mar 3, 2026
75f84c7
Fixes failing tests
ncordon Mar 4, 2026
ee45151
Shares more code
ncordon Mar 4, 2026
93cd5c6
Merge branch 'main' into grouped-topn-operator
ncordon Mar 4, 2026
21afee5
Merge branch 'main' into grouped-topn-operator
ncordon Mar 4, 2026
b1d2469
Merge branch 'main' into grouped-topn-operator
ncordon Mar 5, 2026
b694a8c
Simplified grouped topn test
ivancea Mar 5, 2026
aa96d78
Make static test methods non-static
ivancea Mar 5, 2026
e4d7e11
Merge branch 'main' into grouped-topn-operator
ivancea Mar 5, 2026
af440c6
Initial GroupedTOpNOperator benchmarks
ivancea Mar 5, 2026
905b80a
Simplified tests removing redundant methods
ivancea Mar 5, 2026
7c57e43
Reorder private method
ivancea Mar 5, 2026
e453d22
Merge branch 'main' into grouped-topn-operator
ivancea Mar 5, 2026
0752772
Merge branch 'main' into grouped-topn-operator
ivancea Mar 6, 2026
cff0db2
Merge branch 'grouped-topn-operator' into grouped-topn-benchmark
ivancea Mar 6, 2026
ed995a0
Javadoc and renames, and removed outdated test
ivancea Mar 6, 2026
f5f96fb
Rename test operator
ivancea Mar 6, 2026
bce5c45
[CI] Auto commit changes from spotless
Mar 6, 2026
476e635
Fix double closing and simplify GroupedQueue tests
ivancea Mar 6, 2026
7af7541
Changed status groupCount to int and fixed TopNRow hashCode
ivancea Mar 9, 2026
2055ff8
Merge branch 'main' into grouped-topn-operator
ivancea Mar 9, 2026
b727c7f
Changes PositionKeyEncoder by GroupKeyEncoder
ncordon Mar 10, 2026
a7649ad
Removed redundant groupKeys to simplify double accounting
ivancea Mar 10, 2026
d4b993e
Merge branch 'main' into grouped-topn-operator
ivancea Mar 10, 2026
c341ac1
Merge branch 'grouped-topn-operator' into grouped-topn-benchmark
ivancea Mar 10, 2026
48fcc85
Added example with TopNBenchmark instead
ivancea Mar 10, 2026
26e7c9a
Revert "Added example with TopNBenchmark instead"
ivancea Mar 10, 2026
61b22d6
Fixed benchmark
ivancea Mar 10, 2026
adc17c5
Merge branch 'main' into grouped-topn-benchmark
ivancea Mar 11, 2026
bc1d185
Fixed boolean generation on topn benchmarks, and release pages
ivancea Mar 11, 2026
973f12a
Improved group keys
ivancea Mar 12, 2026
96e3756
Merge branch 'main' into grouped-topn-benchmark
ivancea Mar 12, 2026
9beae4d
[CI] Auto commit changes from spotless
Mar 12, 2026
b92407e
Merge branch 'main' into grouped-topn-benchmark
ivancea Mar 12, 2026
0338e2c
Merge branch 'main' into grouped-topn-benchmark
ivancea Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.benchmark._nightly.esql;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.benchmark.Utils;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BooleanBlock;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
import org.elasticsearch.compute.operator.GroupKeyEncoder;
import org.elasticsearch.compute.operator.Operator;
import org.elasticsearch.compute.operator.topn.GroupedTopNOperator;
import org.elasticsearch.compute.operator.topn.TopNEncoder;
import org.elasticsearch.compute.operator.topn.TopNOperator;
import org.elasticsearch.core.Releasables;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

@Warmup(iterations = 5)
@Measurement(iterations = 7)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Thread)
@Fork(1)
public class GroupedTopNBenchmark {

private static final BlockFactory blockFactory = BlockFactory.builder(BigArrays.NON_RECYCLING_INSTANCE)
.breaker(new NoopCircuitBreaker("none"))
.build();

private static final int BLOCK_LENGTH = 4 * 1024;
private static final int NUM_PAGES = 1024;
private static final int SELF_TEST_PAGES = 16;

private static final String LONGS = "longs";
private static final String INTS = "ints";
private static final String DOUBLES = "doubles";
private static final String BOOLEANS = "booleans";
private static final String BYTES_REFS = "bytes_refs";

private static final String ASC = "_asc";
private static final String DESC = "_desc";

private static final String AND = "_and_";

static {
Utils.configureBenchmarkLogging();
// Smoke test all the expected values and force loading subclasses more like prod
selfTest();
}

static void selfTest() {
try {
for (String data : GroupedTopNBenchmark.class.getField("data").getAnnotationsByType(Param.class)[0].value()) {
for (String topCount : GroupedTopNBenchmark.class.getField("topCount").getAnnotationsByType(Param.class)[0].value()) {
for (String groupCount : GroupedTopNBenchmark.class.getField("groupCount").getAnnotationsByType(Param.class)[0]
.value()) {
for (String gk : GroupedTopNBenchmark.class.getField("groupKeys").getAnnotationsByType(Param.class)[0].value()) {
run(data, Integer.parseInt(topCount), Integer.parseInt(groupCount), gk, SELF_TEST_PAGES);
}
}
}
}
} catch (NoSuchFieldException e) {
throw new AssertionError();
}
}

@Param({ LONGS + ASC, LONGS + DESC, BYTES_REFS + ASC, LONGS + ASC + AND + LONGS + ASC, LONGS + ASC + AND + BYTES_REFS + ASC })
public String data;

@Param({ "1", "10", "1000" })
public int topCount;

@Param({ "10", "100", "1000" })
public int groupCount;

@Param({ LONGS, BYTES_REFS, LONGS + AND + LONGS, BYTES_REFS + AND + BYTES_REFS, LONGS + AND + BYTES_REFS })
public String groupKeys;

private static Operator operator(String data, int topCount, String groupKeys) {
String[] dataSpec = data.split(AND);
List<ElementType> elementTypes = new ArrayList<>(Arrays.stream(dataSpec).map(GroupedTopNBenchmark::elementType).toList());
List<TopNEncoder> encoders = new ArrayList<>(Arrays.stream(dataSpec).map(GroupedTopNBenchmark::encoder).toList());
List<TopNOperator.SortOrder> sortOrders = IntStream.range(0, dataSpec.length).mapToObj(c -> sortOrder(c, dataSpec[c])).toList();

String[] groupKeySpec = groupKeys.split(AND);
int[] groupKeyChannels = new int[groupKeySpec.length];
for (int i = 0; i < groupKeySpec.length; i++) {
groupKeyChannels[i] = elementTypes.size();
elementTypes.add(elementType(groupKeySpec[i]));
encoders.add(TopNEncoder.DEFAULT_UNSORTABLE);
}

return new GroupedTopNOperator(
blockFactory,
blockFactory.breaker(),
topCount,
elementTypes,
encoders,
sortOrders,
new GroupKeyEncoder(groupKeyChannels, elementTypes, new BreakingBytesRefBuilder(blockFactory.breaker(), "group-key-encoder")),
8 * 1024,
Long.MAX_VALUE
);
}

private static ElementType elementType(String data) {
return switch (data.replace(ASC, "").replace(DESC, "")) {
case LONGS -> ElementType.LONG;
case INTS -> ElementType.INT;
case DOUBLES -> ElementType.DOUBLE;
case BOOLEANS -> ElementType.BOOLEAN;
case BYTES_REFS -> ElementType.BYTES_REF;
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
};
}

private static TopNEncoder encoder(String data) {
return switch (data.replace(ASC, "").replace(DESC, "")) {
case LONGS, INTS, DOUBLES, BOOLEANS -> TopNEncoder.DEFAULT_SORTABLE;
case BYTES_REFS -> TopNEncoder.UTF8;
default -> throw new IllegalArgumentException("unsupported data type [" + data + "]");
};
}

private static boolean ascDesc(String data) {
if (data.endsWith(ASC)) {
return true;
} else if (data.endsWith(DESC)) {
return false;
} else {
throw new IllegalArgumentException("data neither asc nor desc: " + data);
}
}

private static TopNOperator.SortOrder sortOrder(int channel, String data) {
return new TopNOperator.SortOrder(channel, ascDesc(data), false);
}

private static void checkExpected(int topCount, int groupCount, int numPages, List<Page> pages) {
int effectiveGroupCount = Math.min(groupCount, BLOCK_LENGTH);
long expectedOutput = 0;
for (int g = 0; g < effectiveGroupCount; g++) {
int rowsPerPage = BLOCK_LENGTH / effectiveGroupCount + (g < BLOCK_LENGTH % effectiveGroupCount ? 1 : 0);
long totalRowsForGroup = (long) rowsPerPage * numPages;
expectedOutput += Math.min(topCount, totalRowsForGroup);
}
long actualOutput = pages.stream().mapToLong(Page::getPositionCount).sum();
if (expectedOutput != actualOutput) {
throw new AssertionError("expected [" + expectedOutput + "] but got [" + actualOutput + "]");
}
}

private static Page page(String data, int groupCount, String groupKeys) {
String[] dataSpec = data.split(AND);
String[] groupKeySpec = groupKeys.split(AND);
int effectiveGroupCount = Math.min(groupCount, BLOCK_LENGTH);
int divisor = (int) Math.ceil(Math.sqrt(effectiveGroupCount));

Block[] blocks = new Block[dataSpec.length + groupKeySpec.length];
for (int i = 0; i < dataSpec.length; i++) {
blocks[i] = block(dataSpec[i]);
}
for (int k = 0; k < groupKeySpec.length; k++) {
blocks[dataSpec.length + k] = groupKeyBlock(groupKeySpec[k], effectiveGroupCount, divisor, k, groupKeySpec.length);
}
return new Page(blocks);
}

private static Block block(String data) {
return switch (data.replace(ASC, "").replace(DESC, "")) {
case LONGS -> {
var builder = blockFactory.newLongBlockBuilder(BLOCK_LENGTH);
new Random().longs(BLOCK_LENGTH, 0, Long.MAX_VALUE).forEach(builder::appendLong);
yield builder.build();
}
case INTS -> {
var builder = blockFactory.newIntBlockBuilder(BLOCK_LENGTH);
new Random().ints(BLOCK_LENGTH, 0, Integer.MAX_VALUE).forEach(builder::appendInt);
yield builder.build();
}
case DOUBLES -> {
var builder = blockFactory.newDoubleBlockBuilder(BLOCK_LENGTH);
new Random().doubles(BLOCK_LENGTH, 0, Double.MAX_VALUE).forEach(builder::appendDouble);
yield builder.build();
}
case BOOLEANS -> {
BooleanBlock.Builder builder = blockFactory.newBooleanBlockBuilder(BLOCK_LENGTH);
new Random().ints(BLOCK_LENGTH, 0, 2).forEach(i -> builder.appendBoolean(i == 1));
yield builder.build();
}
case BYTES_REFS -> {
BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(BLOCK_LENGTH);
new Random().ints(BLOCK_LENGTH, 0, Integer.MAX_VALUE)
.forEach(i -> builder.appendBytesRef(new BytesRef(Integer.toString(i))));
yield builder.build();
}
default -> throw new UnsupportedOperationException("unsupported data [" + data + "]");
};
}

private static Block groupKeyBlock(String groupKeyType, int effectiveGroupCount, int divisor, int keyIndex, int groupKeyCount) {
return switch (groupKeyType) {
case LONGS -> {
var builder = blockFactory.newLongBlockBuilder(BLOCK_LENGTH);
for (int i = 0; i < BLOCK_LENGTH; i++) {
int groupId = i % effectiveGroupCount;
long keyValue = groupKeyCount == 1 ? groupId : (keyIndex == 0 ? groupId / divisor : groupId % divisor);
builder.appendLong(keyValue);
}
yield builder.build();
}
case BYTES_REFS -> {
BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(BLOCK_LENGTH);
for (int i = 0; i < BLOCK_LENGTH; i++) {
int groupId = i % effectiveGroupCount;
long keyValue = groupKeyCount == 1 ? groupId : (keyIndex == 0 ? groupId / divisor : groupId % divisor);
builder.appendBytesRef(new BytesRef(Long.toString(keyValue)));
}
yield builder.build();
}
default -> throw new IllegalArgumentException("unsupported group key type [" + groupKeyType + "]");
};
}

@Benchmark
@OperationsPerInvocation(NUM_PAGES * BLOCK_LENGTH)
public void run() {
run(data, topCount, groupCount, groupKeys, NUM_PAGES);
}

private static void run(String data, int topCount, int groupCount, String groupKeys, int numPages) {
try (Operator operator = operator(data, topCount, groupKeys)) {
Page page = page(data, groupCount, groupKeys);
for (int i = 0; i < numPages; i++) {
operator.addInput(page.shallowCopy());
}
operator.finish();
List<Page> results = new ArrayList<>();
try {
Page p;
while ((p = operator.getOutput()) != null) {
results.add(p);
}
checkExpected(topCount, groupCount, numPages, results);
} finally {
Releasables.close(results);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.compute.operator.topn.SharedMinCompetitive;
import org.elasticsearch.compute.operator.topn.TopNEncoder;
import org.elasticsearch.compute.operator.topn.TopNOperator;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.indices.breaker.CircuitBreakerMetrics;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
Expand Down Expand Up @@ -127,7 +128,7 @@ static void selfTest() {
public int topCount;

private static Operator operator(String data, int topCount, boolean sortedInput) {
String[] dataSpec = data.split("_and_");
String[] dataSpec = data.split(AND);
List<ElementType> elementTypes = Arrays.stream(dataSpec).map(TopNBenchmark::elementType).toList();
List<TopNEncoder> encoders = Arrays.stream(dataSpec).map(TopNBenchmark::encoder).toList();
List<TopNOperator.SortOrder> sortOrders = IntStream.range(0, dataSpec.length).mapToObj(c -> sortOrder(c, dataSpec[c])).toList();
Expand Down Expand Up @@ -204,7 +205,7 @@ private static void checkExpected(int topCount, List<Page> pages) {
}

private static Page page(boolean sortedInput, String data) {
String[] dataSpec = data.split("_and_");
String[] dataSpec = data.split(AND);
return new Page(Arrays.stream(dataSpec).map(d -> block(sortedInput, d)).toArray(Block[]::new));
}

Expand All @@ -230,7 +231,7 @@ private static Block block(boolean sortedInput, String data) {
}
case BOOLEANS -> {
BooleanBlock.Builder builder = blockFactory.newBooleanBlockBuilder(BLOCK_LENGTH);
maybeSort(sortedInput, data, new Random().ints(BLOCK_LENGTH, 0, 1).boxed()).forEach(i -> builder.appendBoolean(i == 1));
maybeSort(sortedInput, data, new Random().ints(BLOCK_LENGTH, 0, 2).boxed()).forEach(i -> builder.appendBoolean(i == 1));
yield builder.build();
}
case BYTES_REFS -> {
Expand Down Expand Up @@ -268,11 +269,15 @@ private static void run(String data, int topCount, boolean sortedInput) {
}
operator.finish();
List<Page> results = new ArrayList<>();
Page p;
while ((p = operator.getOutput()) != null) {
results.add(p);
try {
Page p;
while ((p = operator.getOutput()) != null) {
results.add(p);
}
checkExpected(topCount, results);
} finally {
Releasables.close(results);
}
checkExpected(topCount, results);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.benchmark._nightly.esql;

import org.elasticsearch.test.ESTestCase;

public class GroupedTopNBenchmarkTests extends ESTestCase {
public void test() {
GroupedTopNBenchmark.selfTest();
}
}
Loading