diff --git a/docs/changelog/140171.yaml b/docs/changelog/140171.yaml new file mode 100644 index 0000000000000..30637abba6dd7 --- /dev/null +++ b/docs/changelog/140171.yaml @@ -0,0 +1,6 @@ +pr: 140171 +summary: Converted `PackedValuesBlockHash.bytes` to `BreakingBytesRefBuilder` for + better memory tracking +area: ES|QL +type: enhancement +issues: [] diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java index 26cd234b0150a..8b45858a4e4e5 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHash.java @@ -8,7 +8,7 @@ package org.elasticsearch.compute.aggregation.blockhash; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefBuilder; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; @@ -21,6 +21,7 @@ import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; import org.elasticsearch.compute.operator.mvdedupe.BatchEncoder; import org.elasticsearch.compute.operator.mvdedupe.MultivalueDedupe; import org.elasticsearch.core.Releasable; @@ -63,16 +64,33 @@ final class PackedValuesBlockHash extends BlockHash { private final int emitBatchSize; private final BytesRefHashTable bytesRefHash; private final int nullTrackingBytes; - private final BytesRefBuilder bytes = new BytesRefBuilder(); + private final BreakingBytesRefBuilder bytes; private final List specs; PackedValuesBlockHash(List specs, BlockFactory blockFactory, int emitBatchSize) { + this(specs, blockFactory, blockFactory.breaker(), emitBatchSize); + } + + /* + * This constructor is also used by {@code PackedValuesBlockHashCircuitBreakerTests} to provide different circuit breakers + * to bytesRefHash and bytes. Production code should use the primary constructor above and provide same breaker for both. + */ + PackedValuesBlockHash(List specs, BlockFactory blockFactory, CircuitBreaker circuitBreaker, int emitBatchSize) { super(blockFactory); this.specs = specs; this.emitBatchSize = emitBatchSize; - this.bytesRefHash = HashImplFactory.newBytesRefHash(blockFactory); this.nullTrackingBytes = (specs.size() + 7) / 8; - bytes.grow(nullTrackingBytes); + boolean success = false; + try { + this.bytesRefHash = HashImplFactory.newBytesRefHash(blockFactory); + this.bytes = new BreakingBytesRefBuilder(circuitBreaker, "PackedValuesBlockHash", this.nullTrackingBytes); + success = true; + } finally { + // close bytesRefHash and bytes to prevent memory leaks in case of the initialization fails + if (success == false) { + close(); + } + } } @Override @@ -147,14 +165,14 @@ void add() { private void addSingleEntry() { fillBytesSv(groups); - appendOrdSv(position, Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get())))); + appendOrdSv(position, Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.bytesRefView())))); } private void addMultipleEntries() { int g = 0; do { fillBytesMv(groups, g); - appendOrdInMv(position, Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get())))); + appendOrdInMv(position, Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.bytesRefView())))); g = rewindKeys(groups); } while (g >= 0); finishMv(); @@ -216,7 +234,7 @@ public IntBlock next() { private void lookupSingleEntry(IntBlock.Builder ords) { fillBytesSv(groups); - long found = bytesRefHash.find(bytes.get()); + long found = bytesRefHash.find(bytes.bytesRefView()); if (found < 0) { ords.appendNull(); } else { @@ -233,7 +251,7 @@ private void lookupMultipleEntries(IntBlock.Builder ords) { fillBytesMv(groups, g); // emit ords - long found = bytesRefHash.find(bytes.get()); + long found = bytesRefHash.find(bytes.bytesRefView()); if (found >= 0) { if (firstFound < 0) { firstFound = found; @@ -413,7 +431,7 @@ public BitArray seenGroupIds(BigArrays bigArrays) { @Override public void close() { - bytesRefHash.close(); + Releasables.close(bytesRefHash, bytes); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/mvdedupe/BatchEncoder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/mvdedupe/BatchEncoder.java index 5460210b688eb..a3a315fa7d1e4 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/mvdedupe/BatchEncoder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/mvdedupe/BatchEncoder.java @@ -19,6 +19,7 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; import org.elasticsearch.core.Releasable; import java.lang.invoke.MethodHandles; @@ -73,7 +74,7 @@ public static Decoder decoder(ElementType elementType) { * * @return the number of bytes has read */ - public abstract int read(int index, BytesRefBuilder dst); + public abstract int read(int index, BreakingBytesRefBuilder dst); /** * Encodes the next batch of entries. This will encode values until the next @@ -165,7 +166,7 @@ public final int valueCount(int positionOffset) { * no random-access way to get the first index for a position. */ @Override - public final int read(int index, BytesRefBuilder dst) { + public final int read(int index, BreakingBytesRefBuilder dst) { int start = valueOffsets[index]; int length = valueOffsets[index + 1] - start; if (length > 0) { @@ -286,7 +287,7 @@ public final int valueCount(int positionOffset) { } @Override - public int read(int index, BytesRefBuilder dst) { + public int read(int index, BreakingBytesRefBuilder dst) { if (valueCount == 0) { assert index == 0 : index; return 0; @@ -296,7 +297,7 @@ public int read(int index, BytesRefBuilder dst) { } } - protected abstract int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst); + protected abstract int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst); @Override public final long ramBytesUsed() { @@ -366,7 +367,7 @@ protected static final class DirectInts extends DirectEncoder { } @Override - protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) { + protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) { int before = dst.length(); int after = before + Integer.BYTES; dst.grow(after); @@ -416,7 +417,7 @@ protected static final class DirectLongs extends DirectEncoder { } @Override - protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) { + protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) { int before = dst.length(); int after = before + Long.BYTES; dst.grow(after); @@ -484,7 +485,7 @@ protected static final class DirectDoubles extends DirectEncoder { } @Override - protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) { + protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) { int before = dst.length(); int after = before + Double.BYTES; dst.grow(after); @@ -546,7 +547,7 @@ protected static final class DirectBooleans extends DirectEncoder { } @Override - protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) { + protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) { var v = ((BooleanBlock) block).getBoolean(valueIndex); dst.append((byte) (v ? 1 : 0)); return 1; @@ -613,7 +614,7 @@ protected static final class DirectBytesRefs extends DirectEncoder { } @Override - protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) { + protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) { var v = ((BytesRefBlock) block).getBytesRef(valueIndex, scratch); int start = dst.length(); dst.grow(start + Integer.BYTES + v.length); @@ -654,7 +655,7 @@ protected static final class DirectNulls extends DirectEncoder { } @Override - protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) { + protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) { assert false : "all positions all nulls"; throw new IllegalStateException("all positions all nulls"); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTestCase.java index 8d966a93bb6c2..74afc97757f54 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/BlockHashTestCase.java @@ -57,7 +57,7 @@ public void checkBreaker() { } // A breaker service that always returns the given breaker for getBreaker(CircuitBreaker.REQUEST) - private static CircuitBreakerService mockBreakerService(CircuitBreaker breaker) { + static CircuitBreakerService mockBreakerService(CircuitBreaker breaker) { CircuitBreakerService breakerService = mock(CircuitBreakerService.class); when(breakerService.getBreaker(CircuitBreaker.REQUEST)).thenReturn(breaker); return breakerService; diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHashCircuitBreakerTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHashCircuitBreakerTests.java new file mode 100644 index 0000000000000..2ad0627a60556 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/PackedValuesBlockHashCircuitBreakerTests.java @@ -0,0 +1,80 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation.blockhash; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BytesRefBlock; +import org.elasticsearch.compute.data.ElementType; +import org.elasticsearch.compute.data.IntArrayBlock; +import org.elasticsearch.compute.data.IntBigArrayBlock; +import org.elasticsearch.compute.data.IntVector; +import org.elasticsearch.compute.data.Page; + +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.common.util.MockBigArrays.ERROR_MESSAGE; +import static org.hamcrest.Matchers.equalTo; + +public class PackedValuesBlockHashCircuitBreakerTests extends BlockHashTestCase { + + /** + * Set the breaker limit low enough, and test that adding many(1000) groups of BYTES_REF into bytes {@code BreakingBytesRefBuilder} + * , which is reused for each grouping set, will trigger CBE. CBE happens when adding around 11th group to bytes. + */ + public void testCircuitBreakerWithManyGroups() { + CircuitBreaker bytesBreaker = new MockBigArrays.LimitedBreaker(CircuitBreaker.REQUEST, ByteSizeValue.ofKb(1)); + BlockFactory blockFactory = BlockFactory.getInstance(new NoopCircuitBreaker("test"), BigArrays.NON_RECYCLING_INSTANCE); + + // 1000 group keys of BYTES_REF + List groupSpecs = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + groupSpecs.add(new BlockHash.GroupSpec(i, ElementType.BYTES_REF)); + } + + try ( + PackedValuesBlockHash blockHash = new PackedValuesBlockHash(groupSpecs, blockFactory, bytesBreaker, 32); + BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(1) + ) { + builder.appendBytesRef(new BytesRef("test")); + Block block = builder.build(); + Block[] blocks = new Block[1000]; + for (int i = 0; i < 1000; i++) { + blocks[i] = block; + } + Page page = new Page(blocks); + + CircuitBreakingException e = expectThrows( + CircuitBreakingException.class, + () -> blockHash.add(page, new GroupingAggregatorFunction.AddInput() { + @Override + public void add(int positionOffset, IntArrayBlock groupIds) {} + + @Override + public void add(int positionOffset, IntBigArrayBlock groupIds) {} + + @Override + public void add(int positionOffset, IntVector groupIds) {} + + @Override + public void close() {} + }) + ); + assertThat(e.getMessage(), equalTo(ERROR_MESSAGE)); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java index f518e1893f206..a658f52af325c 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/mvdedupe/MultivalueDedupeTests.java @@ -10,8 +10,8 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefBuilder; import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.NoopCircuitBreaker; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BytesRefHash; @@ -28,6 +28,7 @@ import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.operator.BreakingBytesRefBuilder; import org.elasticsearch.compute.test.BlockTestUtils; import org.elasticsearch.compute.test.RandomBlock; import org.elasticsearch.compute.test.TestBlockFactory; @@ -540,10 +541,11 @@ private int assertEncodedPosition(RandomBlock b, BatchEncoder encoder, int posit */ Block.Builder builder = elementType.newBlockBuilder(encoder.valueCount(offset), TestBlockFactory.getNonBreakingInstance()); BytesRef[] toDecode = new BytesRef[encoder.valueCount(offset)]; + CircuitBreaker breaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST); for (int i = 0; i < toDecode.length; i++) { - BytesRefBuilder dest = new BytesRefBuilder(); + BreakingBytesRefBuilder dest = new BreakingBytesRefBuilder(breaker, "test"); encoder.read(valueOffset++, dest); - toDecode[i] = dest.toBytesRef(); + toDecode[i] = dest.bytesRefView(); if (b.values().get(position) == null) { // Nulls are encoded as 0 length values assertThat(toDecode[i].length, equalTo(0));