From 1ac55edd45a87bd6d04cca5387c44efc35ee477e Mon Sep 17 00:00:00 2001 From: Fang Xing Date: Mon, 5 Jan 2026 10:16:36 -0500 Subject: [PATCH 1/4] converted PackedValuesBlockHash.bytes to BreakingBytesRefBuilder --- .../blockhash/PackedValuesBlockHash.java | 23 ++++++++++++------- .../operator/mvdedupe/BatchEncoder.java | 21 +++++++++-------- .../mvdedupe/MultivalueDedupeTests.java | 8 ++++--- 3 files changed, 31 insertions(+), 21 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java index 6eb3aef6dfc8b..b6ba5e008e536 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java @@ -8,7 +8,6 @@ package org.elasticsearch.compute.aggregation.blockhash; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefBuilder; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; @@ -21,6 +20,7 @@ import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; import org.elasticsearch.compute.operator.mvdedupe.BatchEncoder; import org.elasticsearch.compute.operator.mvdedupe.MultivalueDedupe; import org.elasticsearch.core.Releasable; @@ -63,7 +63,7 @@ final class PackedValuesBlockHash extends BlockHash { private final int emitBatchSize; private final BytesRefHash bytesRefHash; private final int nullTrackingBytes; - private final BytesRefBuilder bytes = new BytesRefBuilder(); + private final BreakingBytesRefBuilder bytes; private final List specs; PackedValuesBlockHash(List specs, BlockFactory blockFactory, int emitBatchSize) { @@ -71,8 +71,14 @@ final class PackedValuesBlockHash extends BlockHash { this.specs = specs; this.emitBatchSize = emitBatchSize; this.bytesRefHash = new BytesRefHash(1, blockFactory.bigArrays()); - this.nullTrackingBytes = (specs.size() + 7) / 8; - bytes.grow(nullTrackingBytes); + try { + this.nullTrackingBytes = (specs.size() + 7) / 8; + this.bytes = new BreakingBytesRefBuilder(blockFactory.breaker(), "PackedValuesBlockHash", this.nullTrackingBytes); + } catch (Exception e) { + // close bytesRefHash to prevent memory leaks in case of the initialization of bytes fails + this.bytesRefHash.close(); + throw e; + } } @Override @@ -147,14 +153,14 @@ void add() { private void addSingleEntry() { fillBytesSv(groups); - appendOrdSv(position, Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get())))); + appendOrdSv(position, Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.bytesRefView())))); } private void addMultipleEntries() { int g = 0; do { fillBytesMv(groups, g); - appendOrdInMv(position, Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get())))); + appendOrdInMv(position, Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.bytesRefView())))); g = rewindKeys(groups); } while (g >= 0); finishMv(); @@ -216,7 +222,7 @@ public IntBlock next() { private void lookupSingleEntry(IntBlock.Builder ords) { fillBytesSv(groups); - long found = bytesRefHash.find(bytes.get()); + long found = bytesRefHash.find(bytes.bytesRefView()); if (found < 0) { ords.appendNull(); } else { @@ -233,7 +239,7 @@ private void lookupMultipleEntries(IntBlock.Builder ords) { fillBytesMv(groups, g); // emit ords - long found = bytesRefHash.find(bytes.get()); + long found = bytesRefHash.find(bytes.bytesRefView()); if (found >= 0) { if (firstFound < 0) { firstFound = found; @@ -414,6 +420,7 @@ public BitArray seenGroupIds(BigArrays bigArrays) { @Override public void close() { bytesRefHash.close(); + bytes.close(); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/mvdedupe/BatchEncoder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/mvdedupe/BatchEncoder.java index 5460210b688eb..a3a315fa7d1e4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/mvdedupe/BatchEncoder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/mvdedupe/BatchEncoder.java @@ -19,6 +19,7 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; import org.elasticsearch.core.Releasable; import java.lang.invoke.MethodHandles; @@ -73,7 +74,7 @@ public static Decoder decoder(ElementType elementType) { * * @return the number of bytes has read */ - public abstract int read(int index, BytesRefBuilder dst); + public abstract int read(int index, BreakingBytesRefBuilder dst); /** * Encodes the next batch of entries. This will encode values until the next @@ -165,7 +166,7 @@ public final int valueCount(int positionOffset) { * no random-access way to get the first index for a position. */ @Override - public final int read(int index, BytesRefBuilder dst) { + public final int read(int index, BreakingBytesRefBuilder dst) { int start = valueOffsets[index]; int length = valueOffsets[index + 1] - start; if (length > 0) { @@ -286,7 +287,7 @@ public final int valueCount(int positionOffset) { } @Override - public int read(int index, BytesRefBuilder dst) { + public int read(int index, BreakingBytesRefBuilder dst) { if (valueCount == 0) { assert index == 0 : index; return 0; @@ -296,7 +297,7 @@ public int read(int index, BytesRefBuilder dst) { } } - protected abstract int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst); + protected abstract int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst); @Override public final long ramBytesUsed() { @@ -366,7 +367,7 @@ protected static final class DirectInts extends DirectEncoder { } @Override - protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) { + protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) { int before = dst.length(); int after = before + Integer.BYTES; dst.grow(after); @@ -416,7 +417,7 @@ protected static final class DirectLongs extends DirectEncoder { } @Override - protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) { + protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) { int before = dst.length(); int after = before + Long.BYTES; dst.grow(after); @@ -484,7 +485,7 @@ protected static final class DirectDoubles extends DirectEncoder { } @Override - protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) { + protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) { int before = dst.length(); int after = before + Double.BYTES; dst.grow(after); @@ -546,7 +547,7 @@ protected static final class DirectBooleans extends DirectEncoder { } @Override - protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) { + protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) { var v = ((BooleanBlock) block).getBoolean(valueIndex); dst.append((byte) (v ? 1 : 0)); return 1; @@ -613,7 +614,7 @@ protected static final class DirectBytesRefs extends DirectEncoder { } @Override - protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) { + protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) { var v = ((BytesRefBlock) block).getBytesRef(valueIndex, scratch); int start = dst.length(); dst.grow(start + Integer.BYTES + v.length); @@ -654,7 +655,7 @@ protected static final class DirectNulls extends DirectEncoder { } @Override - protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) { + protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) { assert false : "all positions all nulls"; throw new IllegalStateException("all positions all nulls"); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java index 1d1401f51f018..2ca24dac24c9c 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java @@ -10,8 +10,8 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefBuilder; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BytesRefHash; @@ -26,6 +26,7 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; import org.elasticsearch.compute.test.BlockTestUtils; import org.elasticsearch.compute.test.RandomBlock; import org.elasticsearch.compute.test.TestBlockFactory; @@ -493,10 +494,11 @@ private int assertEncodedPosition(RandomBlock b, BatchEncoder encoder, int posit */ Block.Builder builder = elementType.newBlockBuilder(encoder.valueCount(offset), TestBlockFactory.getNonBreakingInstance()); BytesRef[] toDecode = new BytesRef[encoder.valueCount(offset)]; + CircuitBreaker breaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST); for (int i = 0; i < toDecode.length; i++) { - BytesRefBuilder dest = new BytesRefBuilder(); + BreakingBytesRefBuilder dest = new BreakingBytesRefBuilder(breaker, "test"); encoder.read(valueOffset++, dest); - toDecode[i] = dest.toBytesRef(); + toDecode[i] = dest.bytesRefView(); if (b.values().get(position) == null) { // Nulls are encoded as 0 length values assertThat(toDecode[i].length, equalTo(0)); From 7e718a79b7492b095b908375f382dc4413835d57 Mon Sep 17 00:00:00 2001 From: Fang Xing <155562079+fang-xing-esql@users.noreply.github.com> Date: Mon, 5 Jan 2026 10:24:55 -0500 Subject: [PATCH 2/4] Update docs/changelog/140171.yaml --- docs/changelog/140171.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/140171.yaml diff --git a/docs/changelog/140171.yaml b/docs/changelog/140171.yaml new file mode 100644 index 0000000000000..30637abba6dd7 --- /dev/null +++ b/docs/changelog/140171.yaml @@ -0,0 +1,6 @@ +pr: 140171 +summary: Converted `PackedValuesBlockHash.bytes` to `BreakingBytesRefBuilder` for + better memory tracking +area: ES|QL +type: enhancement +issues: [] From a3760453e1e990a346120626a900b40e7195500d Mon Sep 17 00:00:00 2001 From: Fang Xing Date: Wed, 7 Jan 2026 21:45:59 -0500 Subject: [PATCH 3/4] use dedicated circuit breaker for PackedValuesBlockHash.bytes in test --- .../blockhash/PackedValuesBlockHash.java | 29 +++++++++++++++++-- ...kedValuesBlockHashCircuitBreakerTests.java | 15 ++++------ 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java index ecf39da94299a..d559af5d7f8de 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java @@ -8,6 +8,7 @@ package org.elasticsearch.compute.aggregation.blockhash; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; @@ -71,13 +72,35 @@ final class PackedValuesBlockHash extends BlockHash { this.specs = specs; this.emitBatchSize = emitBatchSize; this.nullTrackingBytes = (specs.size() + 7) / 8; + boolean success = false; try { this.bytesRefHash = HashImplFactory.newBytesRefHash(blockFactory); this.bytes = new BreakingBytesRefBuilder(blockFactory.breaker(), "PackedValuesBlockHash", this.nullTrackingBytes); - } catch (Exception e) { + success = true; + } finally { + // close bytesRefHash and bytes to prevent memory leaks in case of the initialization fails + if (success == false) { + close(); + } + } + } + + // For circuit breaker testing only {@code PackedValuesBlockHashCircuitBreakerTests} + PackedValuesBlockHash(List specs, BlockFactory blockFactory, CircuitBreaker circuitBreaker, int emitBatchSize) { + super(blockFactory); + this.specs = specs; + this.emitBatchSize = emitBatchSize; + this.nullTrackingBytes = (specs.size() + 7) / 8; + boolean success = false; + try { + this.bytesRefHash = HashImplFactory.newBytesRefHash(blockFactory); + this.bytes = new BreakingBytesRefBuilder(circuitBreaker, "PackedValuesBlockHash", this.nullTrackingBytes); + success = true; + } finally { // close bytesRefHash and bytes to prevent memory leaks in case of the initialization fails - close(); - throw e; + if (success == false) { + close(); + } } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHashCircuitBreakerTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHashCircuitBreakerTests.java index d05bf0c46ab8b..2ad0627a60556 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHashCircuitBreakerTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHashCircuitBreakerTests.java @@ -10,19 +10,19 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; -import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.IntArrayBlock; import org.elasticsearch.compute.data.IntBigArrayBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.test.MockBlockFactory; import java.util.ArrayList; import java.util.List; @@ -34,12 +34,11 @@ public class PackedValuesBlockHashCircuitBreakerTests extends BlockHashTestCase /** * Set the breaker limit low enough, and test that adding many(1000) groups of BYTES_REF into bytes {@code BreakingBytesRefBuilder} - * , which is reused for each grouping set will trigger CBE. CBE happens when adding around 25th group to bytes. + * , which is reused for each grouping set, will trigger CBE. CBE happens when adding around 11th group to bytes. */ public void testCircuitBreakerWithManyGroups() { - CircuitBreaker breaker = new MockBigArrays.LimitedBreaker(CircuitBreaker.REQUEST, ByteSizeValue.ofBytes(220000)); - BigArrays bigArrays = new MockBigArrays(PageCacheRecycler.NON_RECYCLING_INSTANCE, mockBreakerService(breaker)); - MockBlockFactory blockFactory = new MockBlockFactory(breaker, bigArrays); + CircuitBreaker bytesBreaker = new MockBigArrays.LimitedBreaker(CircuitBreaker.REQUEST, ByteSizeValue.ofKb(1)); + BlockFactory blockFactory = BlockFactory.getInstance(new NoopCircuitBreaker("test"), BigArrays.NON_RECYCLING_INSTANCE); // 1000 group keys of BYTES_REF List groupSpecs = new ArrayList<>(); @@ -48,7 +47,7 @@ public void testCircuitBreakerWithManyGroups() { } try ( - PackedValuesBlockHash blockHash = new PackedValuesBlockHash(groupSpecs, blockFactory, 32); + PackedValuesBlockHash blockHash = new PackedValuesBlockHash(groupSpecs, blockFactory, bytesBreaker, 32); BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(1) ) { builder.appendBytesRef(new BytesRef("test")); @@ -76,8 +75,6 @@ public void close() {} }) ); assertThat(e.getMessage(), equalTo(ERROR_MESSAGE)); - } finally { - blockFactory.ensureAllBlocksAreReleased(); } } } From 3575999fd8b1679d0f83421aa2bfa5c83ad73380 Mon Sep 17 00:00:00 2001 From: Fang Xing Date: Thu, 8 Jan 2026 10:06:52 -0500 Subject: [PATCH 4/4] refactor PackedValuesBlockHash's constructor --- .../blockhash/PackedValuesBlockHash.java | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java index d559af5d7f8de..8b45858a4e4e5 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java @@ -68,24 +68,13 @@ final class PackedValuesBlockHash extends BlockHash { private final List specs; PackedValuesBlockHash(List specs, BlockFactory blockFactory, int emitBatchSize) { - super(blockFactory); - this.specs = specs; - this.emitBatchSize = emitBatchSize; - this.nullTrackingBytes = (specs.size() + 7) / 8; - boolean success = false; - try { - this.bytesRefHash = HashImplFactory.newBytesRefHash(blockFactory); - this.bytes = new BreakingBytesRefBuilder(blockFactory.breaker(), "PackedValuesBlockHash", this.nullTrackingBytes); - success = true; - } finally { - // close bytesRefHash and bytes to prevent memory leaks in case of the initialization fails - if (success == false) { - close(); - } - } + this(specs, blockFactory, blockFactory.breaker(), emitBatchSize); } - // For circuit breaker testing only {@code PackedValuesBlockHashCircuitBreakerTests} + /* + * This constructor is also used by {@code PackedValuesBlockHashCircuitBreakerTests} to provide different circuit breakers + * to bytesRefHash and bytes. Production code should use the primary constructor above and provide same breaker for both. + */ PackedValuesBlockHash(List specs, BlockFactory blockFactory, CircuitBreaker circuitBreaker, int emitBatchSize) { super(blockFactory); this.specs = specs;