Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@

package org.elasticsearch.compute.operator;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.RamUsageEstimator;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BytesRefHashTable;
import org.elasticsearch.common.util.IntArray;
Expand All @@ -24,6 +27,7 @@
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;

Expand All @@ -32,7 +36,7 @@
* For {@code LIMIT N BY x}, this accepts up to N rows for each distinct value of x.
* Group keys use list semantics for multivalues: {@code [1,2]} and {@code [2,1]} are different groups.
*/
public class GroupedLimitOperator implements Operator {
public class GroupedLimitOperator implements Operator, Accountable {
public static final class Factory implements Operator.OperatorFactory {
private final int limitPerGroup;
private final int[] groupChannels;
Expand All @@ -53,10 +57,12 @@ public GroupedLimitOperator get(DriverContext driverContext) {

@Override
public String describe() {
return "GroupedLimitOperator[limit = " + limitPerGroup + "]";
return "GroupedLimitOperator[limitPerGroup = " + limitPerGroup + "]";
}
}

private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(GroupedLimitOperator.class);

private final int limitPerGroup;
private final GroupKeyEncoder keyEncoder;
/**
Expand Down Expand Up @@ -187,9 +193,18 @@ public Page getOutput() {
return result;
}

@Override
public long ramBytesUsed() {
long size = SHALLOW_SIZE;
size += seenKeys.ramBytesUsed();
size += counts.ramBytesUsed();
size += keyEncoder.ramBytesUsed();
return size;
}

@Override
public Status status() {
return new Status(limitPerGroup, (int) seenKeys.size(), pagesProcessed, rowsReceived, rowsEmitted);
return new Status(limitPerGroup, (int) seenKeys.size(), pagesProcessed, rowsReceived, rowsEmitted, ramBytesUsed());
}

@Override
Expand All @@ -199,7 +214,13 @@ public void close() {

@Override
public String toString() {
return "GroupedLimitOperator[limit = " + limitPerGroup + ", groups = " + seenKeys.size() + "]";
return "GroupedLimitOperator[limitPerGroup = "
+ limitPerGroup
+ ", groupKeys = "
+ Arrays.toString(keyEncoder.groupChannels())
+ ", groups = "
+ seenKeys.size()
+ "]";
}

public static class Status implements Operator.Status {
Expand All @@ -214,13 +235,15 @@ public static class Status implements Operator.Status {
private final int pagesProcessed;
private final long rowsReceived;
private final long rowsEmitted;
private final long ramBytesUsed;

protected Status(int limitPerGroup, int groupCount, int pagesProcessed, long rowsReceived, long rowsEmitted) {
protected Status(int limitPerGroup, int groupCount, int pagesProcessed, long rowsReceived, long rowsEmitted, long ramBytesUsed) {
this.limitPerGroup = limitPerGroup;
this.groupCount = groupCount;
this.pagesProcessed = pagesProcessed;
this.rowsReceived = rowsReceived;
this.rowsEmitted = rowsEmitted;
this.ramBytesUsed = ramBytesUsed;
}

protected Status(StreamInput in) throws IOException {
Expand All @@ -229,6 +252,7 @@ protected Status(StreamInput in) throws IOException {
pagesProcessed = in.readVInt();
rowsReceived = in.readVLong();
rowsEmitted = in.readVLong();
ramBytesUsed = in.readVLong();
}

@Override
Expand All @@ -238,6 +262,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(pagesProcessed);
out.writeVLong(rowsReceived);
out.writeVLong(rowsEmitted);
out.writeVLong(ramBytesUsed);
}
Comment on lines 249 to 266
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GroupedLimitOperator.Status adds ramBytesUsed to the wire format unconditionally (readVLong/writeVLong). This breaks BWC for mixed-version clusters because older nodes will attempt to deserialize the status without this extra field. Gate the new field behind a TransportVersion feature flag (and default ramBytesUsed to 0 when unsupported), similar to HashAggregationOperator.Status / TopNOperatorStatus.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's no need for TV, as this status is never used yet (Not linked with the planning). But it's maybe worth an extra opinion here

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not true because this operator is not in use yet


@Override
Expand Down Expand Up @@ -265,6 +290,10 @@ public long rowsEmitted() {
return rowsEmitted;
}

public long ramBytesUsed() {
return ramBytesUsed;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand All @@ -273,6 +302,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("pages_processed", pagesProcessed);
builder.field("rows_received", rowsReceived);
builder.field("rows_emitted", rowsEmitted);
builder.field("ram_bytes_used", ramBytesUsed);
builder.field("ram_used", ByteSizeValue.ofBytes(ramBytesUsed));
return builder.endObject();
}

Expand All @@ -285,12 +316,13 @@ public boolean equals(Object o) {
&& groupCount == status.groupCount
&& pagesProcessed == status.pagesProcessed
&& rowsReceived == status.rowsReceived
&& rowsEmitted == status.rowsEmitted;
&& rowsEmitted == status.rowsEmitted
&& ramBytesUsed == status.ramBytesUsed;
}

@Override
public int hashCode() {
return Objects.hash(limitPerGroup, groupCount, pagesProcessed, rowsReceived, rowsEmitted);
return Objects.hash(limitPerGroup, groupCount, pagesProcessed, rowsReceived, rowsEmitted, ramBytesUsed);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

public class GroupedLimitOperatorStatusTests extends AbstractWireSerializingTestCase<GroupedLimitOperator.Status> {
public void testToXContent() {
assertThat(Strings.toString(new GroupedLimitOperator.Status(10, 5, 3, 111, 222)), equalTo("""
{"limit_per_group":10,"group_count":5,"pages_processed":3,"rows_received":111,"rows_emitted":222}"""));
assertThat(Strings.toString(new GroupedLimitOperator.Status(10, 5, 3, 111, 222, 4096)), equalTo("""
{"limit_per_group":10,"group_count":5,"pages_processed":3,"rows_received":111,"rows_emitted":222,\
"ram_bytes_used":4096,"ram_used":"4kb"}"""));
}

@Override
Expand All @@ -34,6 +35,7 @@ protected GroupedLimitOperator.Status createTestInstance() {
randomNonNegativeInt(),
randomNonNegativeInt(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong()
);
}
Expand All @@ -45,14 +47,16 @@ protected GroupedLimitOperator.Status mutateInstance(GroupedLimitOperator.Status
int pagesProcessed = instance.pagesProcessed();
long rowsReceived = instance.rowsReceived();
long rowsEmitted = instance.rowsEmitted();
switch (between(0, 4)) {
long ramBytesUsed = instance.ramBytesUsed();
switch (between(0, 5)) {
case 0 -> limitPerGroup = randomValueOtherThan(limitPerGroup, ESTestCase::randomNonNegativeInt);
case 1 -> groupCount = randomValueOtherThan(groupCount, ESTestCase::randomNonNegativeInt);
case 2 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt);
case 3 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong);
case 4 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong);
case 5 -> ramBytesUsed = randomValueOtherThan(ramBytesUsed, ESTestCase::randomNonNegativeLong);
default -> throw new IllegalArgumentException();
}
return new GroupedLimitOperator.Status(limitPerGroup, groupCount, pagesProcessed, rowsReceived, rowsEmitted);
return new GroupedLimitOperator.Status(limitPerGroup, groupCount, pagesProcessed, rowsReceived, rowsEmitted, ramBytesUsed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;

Expand All @@ -44,12 +45,12 @@ protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {

@Override
protected Matcher<String> expectedDescriptionOfSimple() {
return equalTo("GroupedLimitOperator[limit = 100]");
return equalTo("GroupedLimitOperator[limitPerGroup = 100]");
}

@Override
protected Matcher<String> expectedToStringOfSimple() {
return equalTo("GroupedLimitOperator[limit = 100, groups = 0]");
return equalTo("GroupedLimitOperator[limitPerGroup = 100, groupKeys = [0], groups = 0]");
}

@Override
Expand Down Expand Up @@ -419,7 +420,9 @@ protected void assertStatus(Map<String, Object> map, List<Page> input, List<Page
.entry("group_count", greaterThanOrEqualTo(0))
.entry("pages_processed", output.size())
.entry("rows_received", allOf(greaterThanOrEqualTo(emittedRows), lessThanOrEqualTo(inputRows)))
.entry("rows_emitted", emittedRows);
.entry("rows_emitted", emittedRows)
.entry("ram_bytes_used", greaterThanOrEqualTo(0))
.entry("ram_used", notNullValue());

assertMap(map, mapMatcher);
}
Expand Down
Loading