Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/140171.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 140171
summary: Converted `PackedValuesBlockHash.bytes` to `BreakingBytesRefBuilder` for
better memory tracking
area: ES|QL
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package org.elasticsearch.compute.aggregation.blockhash;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
Expand All @@ -21,6 +21,7 @@
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
import org.elasticsearch.compute.operator.mvdedupe.BatchEncoder;
import org.elasticsearch.compute.operator.mvdedupe.MultivalueDedupe;
import org.elasticsearch.core.Releasable;
Expand Down Expand Up @@ -63,16 +64,33 @@ final class PackedValuesBlockHash extends BlockHash {
private final int emitBatchSize;
private final BytesRefHashTable bytesRefHash;
private final int nullTrackingBytes;
private final BytesRefBuilder bytes = new BytesRefBuilder();
private final BreakingBytesRefBuilder bytes;
private final List<GroupSpec> specs;

PackedValuesBlockHash(List<GroupSpec> specs, BlockFactory blockFactory, int emitBatchSize) {
this(specs, blockFactory, blockFactory.breaker(), emitBatchSize);
}

/*
* This constructor is also used by {@code PackedValuesBlockHashCircuitBreakerTests} to provide different circuit breakers
* to bytesRefHash and bytes. Production code should use the primary constructor above and provide same breaker for both.
*/
PackedValuesBlockHash(List<GroupSpec> specs, BlockFactory blockFactory, CircuitBreaker circuitBreaker, int emitBatchSize) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary ctor can call this one like this(specs, blockFactory, blockFactory.circiutBreaker(), emitBatchSize).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! More refactor is needed. :)

super(blockFactory);
this.specs = specs;
this.emitBatchSize = emitBatchSize;
this.bytesRefHash = HashImplFactory.newBytesRefHash(blockFactory);
this.nullTrackingBytes = (specs.size() + 7) / 8;
bytes.grow(nullTrackingBytes);
boolean success = false;
try {
this.bytesRefHash = HashImplFactory.newBytesRefHash(blockFactory);
this.bytes = new BreakingBytesRefBuilder(circuitBreaker, "PackedValuesBlockHash", this.nullTrackingBytes);
success = true;
} finally {
// close bytesRefHash and bytes to prevent memory leaks in case of the initialization fails
if (success == false) {
close();
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is valid and fine, but we usually write this as:

boolean success = false;
        try {
            this.bytesRefHash = HashImplFactory.newBytesRefHash(blockFactory);
            this.bytes = new BreakingBytesRefBuilder(blockFactory.breaker(), "PackedValuesBlockHash", this.nullTrackingBytes);
         } finally {
if (success == false) {
  close();
}

Mostly out of paranoia around eating the stack trace for e. This is just the more "normal" way of writing this for us.

}

@Override
Expand Down Expand Up @@ -147,14 +165,14 @@ void add() {

private void addSingleEntry() {
fillBytesSv(groups);
appendOrdSv(position, Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get()))));
appendOrdSv(position, Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.bytesRefView()))));
}

private void addMultipleEntries() {
int g = 0;
do {
fillBytesMv(groups, g);
appendOrdInMv(position, Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.get()))));
appendOrdInMv(position, Math.toIntExact(hashOrdToGroup(bytesRefHash.add(bytes.bytesRefView()))));
g = rewindKeys(groups);
} while (g >= 0);
finishMv();
Expand Down Expand Up @@ -216,7 +234,7 @@ public IntBlock next() {

private void lookupSingleEntry(IntBlock.Builder ords) {
fillBytesSv(groups);
long found = bytesRefHash.find(bytes.get());
long found = bytesRefHash.find(bytes.bytesRefView());
if (found < 0) {
ords.appendNull();
} else {
Expand All @@ -233,7 +251,7 @@ private void lookupMultipleEntries(IntBlock.Builder ords) {
fillBytesMv(groups, g);

// emit ords
long found = bytesRefHash.find(bytes.get());
long found = bytesRefHash.find(bytes.bytesRefView());
if (found >= 0) {
if (firstFound < 0) {
firstFound = found;
Expand Down Expand Up @@ -413,7 +431,7 @@ public BitArray seenGroupIds(BigArrays bigArrays) {

@Override
public void close() {
bytesRefHash.close();
Releasables.close(bytesRefHash, bytes);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
import org.elasticsearch.core.Releasable;

import java.lang.invoke.MethodHandles;
Expand Down Expand Up @@ -73,7 +74,7 @@ public static Decoder decoder(ElementType elementType) {
*
* @return the number of bytes has read
*/
public abstract int read(int index, BytesRefBuilder dst);
public abstract int read(int index, BreakingBytesRefBuilder dst);

/**
* Encodes the next batch of entries. This will encode values until the next
Expand Down Expand Up @@ -165,7 +166,7 @@ public final int valueCount(int positionOffset) {
* no random-access way to get the first index for a position.
*/
@Override
public final int read(int index, BytesRefBuilder dst) {
public final int read(int index, BreakingBytesRefBuilder dst) {
int start = valueOffsets[index];
int length = valueOffsets[index + 1] - start;
if (length > 0) {
Expand Down Expand Up @@ -286,7 +287,7 @@ public final int valueCount(int positionOffset) {
}

@Override
public int read(int index, BytesRefBuilder dst) {
public int read(int index, BreakingBytesRefBuilder dst) {
if (valueCount == 0) {
assert index == 0 : index;
return 0;
Expand All @@ -296,7 +297,7 @@ public int read(int index, BytesRefBuilder dst) {
}
}

protected abstract int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst);
protected abstract int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst);

@Override
public final long ramBytesUsed() {
Expand Down Expand Up @@ -366,7 +367,7 @@ protected static final class DirectInts extends DirectEncoder {
}

@Override
protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) {
protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) {
int before = dst.length();
int after = before + Integer.BYTES;
dst.grow(after);
Expand Down Expand Up @@ -416,7 +417,7 @@ protected static final class DirectLongs extends DirectEncoder {
}

@Override
protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) {
protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) {
int before = dst.length();
int after = before + Long.BYTES;
dst.grow(after);
Expand Down Expand Up @@ -484,7 +485,7 @@ protected static final class DirectDoubles extends DirectEncoder {
}

@Override
protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) {
protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) {
int before = dst.length();
int after = before + Double.BYTES;
dst.grow(after);
Expand Down Expand Up @@ -546,7 +547,7 @@ protected static final class DirectBooleans extends DirectEncoder {
}

@Override
protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) {
protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) {
var v = ((BooleanBlock) block).getBoolean(valueIndex);
dst.append((byte) (v ? 1 : 0));
return 1;
Expand Down Expand Up @@ -613,7 +614,7 @@ protected static final class DirectBytesRefs extends DirectEncoder {
}

@Override
protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) {
protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) {
var v = ((BytesRefBlock) block).getBytesRef(valueIndex, scratch);
int start = dst.length();
dst.grow(start + Integer.BYTES + v.length);
Expand Down Expand Up @@ -654,7 +655,7 @@ protected static final class DirectNulls extends DirectEncoder {
}

@Override
protected int readValueAtBlockIndex(int valueIndex, BytesRefBuilder dst) {
protected int readValueAtBlockIndex(int valueIndex, BreakingBytesRefBuilder dst) {
assert false : "all positions all nulls";
throw new IllegalStateException("all positions all nulls");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void checkBreaker() {
}

// A breaker service that always returns the given breaker for getBreaker(CircuitBreaker.REQUEST)
private static CircuitBreakerService mockBreakerService(CircuitBreaker breaker) {
static CircuitBreakerService mockBreakerService(CircuitBreaker breaker) {
CircuitBreakerService breakerService = mock(CircuitBreakerService.class);
when(breakerService.getBreaker(CircuitBreaker.REQUEST)).thenReturn(breaker);
return breakerService;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.aggregation.blockhash;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.CircuitBreakingException;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BytesRefBlock;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntArrayBlock;
import org.elasticsearch.compute.data.IntBigArrayBlock;
import org.elasticsearch.compute.data.IntVector;
import org.elasticsearch.compute.data.Page;

import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.common.util.MockBigArrays.ERROR_MESSAGE;
import static org.hamcrest.Matchers.equalTo;

public class PackedValuesBlockHashCircuitBreakerTests extends BlockHashTestCase {

/**
* Set the breaker limit low enough, and test that adding many(1000) groups of BYTES_REF into bytes {@code BreakingBytesRefBuilder}
* , which is reused for each grouping set, will trigger CBE. CBE happens when adding around 11th group to bytes.
*/
public void testCircuitBreakerWithManyGroups() {
CircuitBreaker bytesBreaker = new MockBigArrays.LimitedBreaker(CircuitBreaker.REQUEST, ByteSizeValue.ofKb(1));
BlockFactory blockFactory = BlockFactory.getInstance(new NoopCircuitBreaker("test"), BigArrays.NON_RECYCLING_INSTANCE);

// 1000 group keys of BYTES_REF
List<BlockHash.GroupSpec> groupSpecs = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
groupSpecs.add(new BlockHash.GroupSpec(i, ElementType.BYTES_REF));
}

try (
PackedValuesBlockHash blockHash = new PackedValuesBlockHash(groupSpecs, blockFactory, bytesBreaker, 32);
BytesRefBlock.Builder builder = blockFactory.newBytesRefBlockBuilder(1)
) {
builder.appendBytesRef(new BytesRef("test"));
Block block = builder.build();
Block[] blocks = new Block[1000];
for (int i = 0; i < 1000; i++) {
blocks[i] = block;
}
Page page = new Page(blocks);

CircuitBreakingException e = expectThrows(
CircuitBreakingException.class,
() -> blockHash.add(page, new GroupingAggregatorFunction.AddInput() {
@Override
public void add(int positionOffset, IntArrayBlock groupIds) {}

@Override
public void add(int positionOffset, IntBigArrayBlock groupIds) {}

@Override
public void add(int positionOffset, IntVector groupIds) {}

@Override
public void close() {}
})
);
assertThat(e.getMessage(), equalTo(ERROR_MESSAGE));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BytesRefHash;
Expand All @@ -28,6 +28,7 @@
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.operator.BreakingBytesRefBuilder;
import org.elasticsearch.compute.test.BlockTestUtils;
import org.elasticsearch.compute.test.RandomBlock;
import org.elasticsearch.compute.test.TestBlockFactory;
Expand Down Expand Up @@ -540,10 +541,11 @@ private int assertEncodedPosition(RandomBlock b, BatchEncoder encoder, int posit
*/
Block.Builder builder = elementType.newBlockBuilder(encoder.valueCount(offset), TestBlockFactory.getNonBreakingInstance());
BytesRef[] toDecode = new BytesRef[encoder.valueCount(offset)];
CircuitBreaker breaker = new NoopCircuitBreaker(CircuitBreaker.REQUEST);
for (int i = 0; i < toDecode.length; i++) {
BytesRefBuilder dest = new BytesRefBuilder();
BreakingBytesRefBuilder dest = new BreakingBytesRefBuilder(breaker, "test");
encoder.read(valueOffset++, dest);
toDecode[i] = dest.toBytesRef();
toDecode[i] = dest.bytesRefView();
if (b.values().get(position) == null) {
// Nulls are encoded as 0 length values
assertThat(toDecode[i].length, equalTo(0));
Expand Down