Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@
import io.trino.spi.block.DictionaryBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntArrays;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize;
import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize;
import static java.lang.Math.max;
import static java.util.Objects.requireNonNull;

/**
Expand All @@ -30,10 +35,12 @@ public class UnnestingPositionsAppender
private static final int INSTANCE_SIZE = instanceSize(UnnestingPositionsAppender.class);

private final PositionsAppender delegate;
private DictionaryBlockBuilder dictionaryBlockBuilder;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep state top-level as in io.trino.operator.output.RleAwarePositionsAppender

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this class has now two responsibilities (unnesting and building a dictionary) it's more readable to separate them. It also makes it easier to reset the state of the appender.

Copy link
Copy Markdown
Member

@sopel39 sopel39 Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this class has now two responsibilities (unnesting and building a dictionary)

This class doesn't really unnest much now. RleAwayPositionsAppender could do:

        if (source instanceof RunLengthEncodedBlock) {
            delegate.appendRle(((RunLengthEncodedBlock) source).getValue(), positions.size());
        }

itself really at this point, so UnnestingPositionsAppender would be all about dictionaries

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RleAwayPositionsAppender is not always there. Unnesting part is actually to make sure only flat blocks are passed down to the flat appenders.
Maybe we should merge UnnestingPositionsAppender and RleAwayPositionsAppender into BlockTypeAwarePositionsAppender? although I fear it would make the code messier.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RleAwayPositionsAppender is not always there.

Why wouldn't it be always there?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not needed if the type is not comparable

Copy link
Copy Markdown
Member

@sopel39 sopel39 Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not needed if the type is not comparable

I don't think it's worth extra complexity since there are not many types like that.

However. You could then have minimal UnnestingPositionsAppender without rle or dict builder support.

I don't mixing of current UnnestingPositionsAppender with dictionary awareness is needed


public UnnestingPositionsAppender(PositionsAppender delegate)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.dictionaryBlockBuilder = new DictionaryBlockBuilder();
}

@Override
Expand All @@ -43,12 +50,14 @@ public void append(IntArrayList positions, Block source)
return;
}
if (source instanceof RunLengthEncodedBlock) {
dictionaryBlockBuilder.flushDictionary(delegate);
delegate.appendRle(((RunLengthEncodedBlock) source).getValue(), positions.size());
}
Comment thread
lukasz-stec marked this conversation as resolved.
Outdated
else if (source instanceof DictionaryBlock) {
appendDictionary(positions, (DictionaryBlock) source);
}
else {
dictionaryBlockBuilder.flushDictionary(delegate);
delegate.append(positions, source);
}
}
Expand All @@ -59,12 +68,15 @@ public void appendRle(Block block, int rlePositionCount)
if (rlePositionCount == 0) {
return;
}
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
dictionaryBlockBuilder.flushDictionary(delegate);
delegate.appendRle(block, rlePositionCount);
}

@Override
public void append(int position, Block source)
{
dictionaryBlockBuilder.flushDictionary(delegate);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is dictionary flushed in every case here but in the other append it is flushed only for non-dictionary cases

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we flush in every case other than we have the same dictionary (dictionaryBlockBuilder.canAppend to be precise).
We could also do it here as well at a cost of !closed && (dictionary == this.dictionary || this.dictionary == null) for every row. I will try to benchmark how much this impacts performance of the row by row processing


if (source instanceof RunLengthEncodedBlock runLengthEncodedBlock) {
delegate.append(0, runLengthEncodedBlock.getValue());
}
Expand All @@ -79,13 +91,21 @@ else if (source instanceof DictionaryBlock dictionaryBlock) {
@Override
public Block build()
{
return delegate.build();
Block result;
if (dictionaryBlockBuilder.isEmpty()) {
result = delegate.build();
}
else {
result = dictionaryBlockBuilder.build();
}
dictionaryBlockBuilder = dictionaryBlockBuilder.newBuilderLike();
return result;
}

@Override
public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE + delegate.getRetainedSizeInBytes();
return INSTANCE_SIZE + delegate.getRetainedSizeInBytes() + dictionaryBlockBuilder.getRetainedSizeInBytes();
}

@Override
Expand All @@ -96,15 +116,114 @@ public long getSizeInBytes()

private void appendDictionary(IntArrayList positions, DictionaryBlock source)
{
delegate.append(mapPositions(positions, source), source.getDictionary());
Block dictionary = source.getDictionary();
IntArrayList dictionaryPositions = getDictionaryPositions(positions, source);
if (dictionaryBlockBuilder.canAppend(dictionary)) {
dictionaryBlockBuilder.append(dictionaryPositions, dictionary);
}
else {
dictionaryBlockBuilder.flushDictionary(delegate);
delegate.append(dictionaryPositions, dictionary);
}
}

private IntArrayList mapPositions(IntArrayList positions, DictionaryBlock block)
private IntArrayList getDictionaryPositions(IntArrayList positions, DictionaryBlock block)
{
int[] positionArray = new int[positions.size()];
for (int i = 0; i < positions.size(); i++) {
positionArray[i] = block.getId(positions.getInt(i));
}
return IntArrayList.wrap(positionArray);
}

private static class DictionaryBlockBuilder
{
private static final int INSTANCE_SIZE = instanceSize(DictionaryBlockBuilder.class);
private final int initialEntryCount;
private Block dictionary;
private int[] dictionaryIds;
private int positionCount;
private boolean closed;

public DictionaryBlockBuilder()
{
this(1024);
Comment thread
raunaqmorarka marked this conversation as resolved.
}

public DictionaryBlockBuilder(int initialEntryCount)
{
this.initialEntryCount = initialEntryCount;
this.dictionaryIds = new int[0];
}

public boolean isEmpty()
{
return positionCount == 0;
}

public Block build()
{
return DictionaryBlock.create(positionCount, dictionary, dictionaryIds);
}

public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE
+ (long) dictionaryIds.length * Integer.BYTES
+ (dictionary != null ? dictionary.getRetainedSizeInBytes() : 0);
}

public boolean canAppend(Block dictionary)
{
return !closed && (dictionary == this.dictionary || this.dictionary == null);
Comment thread
lukasz-stec marked this conversation as resolved.
Outdated
}

public void append(IntArrayList mappedPositions, Block dictionary)
{
checkArgument(canAppend(dictionary));
this.dictionary = dictionary;
ensureCapacity(positionCount + mappedPositions.size());
System.arraycopy(mappedPositions.elements(), 0, dictionaryIds, positionCount, mappedPositions.size());
positionCount += mappedPositions.size();
}

public void flushDictionary(PositionsAppender delegate)
{
if (closed) {
Comment thread
raunaqmorarka marked this conversation as resolved.
Outdated
return;
}
if (positionCount > 0) {
requireNonNull(dictionary, () -> "dictionary is null but we have pending dictionaryIds " + positionCount);
delegate.append(IntArrayList.wrap(dictionaryIds, positionCount), dictionary);
}

closed = true;
dictionaryIds = new int[0];
positionCount = 0;
dictionary = null;
}

public DictionaryBlockBuilder newBuilderLike()
{
return new DictionaryBlockBuilder(max(calculateBlockResetSize(positionCount), initialEntryCount));
}

private void ensureCapacity(int capacity)
{
if (dictionaryIds.length >= capacity) {
return;
}

int newSize;
if (dictionaryIds.length > 0) {
newSize = calculateNewArraySize(dictionaryIds.length);
}
else {
newSize = initialEntryCount;
}
newSize = Math.max(newSize, capacity);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any possibility that newSize will be lower than capacity?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well yes, i.e., capacity can be bigger than initialEntryCount or bigger than 1.5 * dictionaryIds.length (calculateNewArraySize)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just do int newSize = calculateNewArraySize(max(initialEntryCount, capacity, dictionaryIds.length))

ofc this will work different that now. If capacity will 100, it will create an array of size 150.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not want to go over initialEntryCount as the appender can be "pre-sized" in #reset


dictionaryIds = IntArrays.ensureCapacity(dictionaryIds, newSize, positionCount);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.SizeOf.instanceSize;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.airlift.slice.Slices.EMPTY_SLICE;
import static io.airlift.testing.Assertions.assertGreaterThanOrEqual;
import static io.airlift.testing.Assertions.assertInstanceOf;
Expand Down Expand Up @@ -231,6 +233,69 @@ public void testRleAppendedWithSinglePositionDoesNotProduceRle(TestType type)
assertFalse(actual instanceof RunLengthEncodedBlock, actual.getClass().getSimpleName());
}

@Test(dataProvider = "types")
public void testMultipleTheSameDictionariesProduceDictionary(TestType type)
{
PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type.getType(), 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES);

testMultipleTheSameDictionariesProduceDictionary(type, positionsAppender);
// test if appender can accept different dictionary after a build
testMultipleTheSameDictionariesProduceDictionary(type, positionsAppender);
}

private void testMultipleTheSameDictionariesProduceDictionary(TestType type, PositionsAppender positionsAppender)
{
Block dictionary = createRandomBlockForType(type, 4, 0);
positionsAppender.append(allPositions(3), createRandomDictionaryBlock(dictionary, 3));
positionsAppender.append(allPositions(2), createRandomDictionaryBlock(dictionary, 2));

Block actual = positionsAppender.build();
assertEquals(actual.getPositionCount(), 5);
assertInstanceOf(actual, DictionaryBlock.class);
assertEquals(((DictionaryBlock) actual).getDictionary(), dictionary);
}

@Test(dataProvider = "types")
public void testDictionarySwitchToFlat(TestType type)
{
List<BlockView> inputs = ImmutableList.of(
input(dictionaryBlock(type, 3, 4, 0), 0, 1),
input(notNullBlock(type, 2), 0, 1));
testAppend(type, inputs);
}

@Test(dataProvider = "types")
public void testFlatAppendDictionary(TestType type)
{
List<BlockView> inputs = ImmutableList.of(
input(notNullBlock(type, 2), 0, 1),
input(dictionaryBlock(type, 3, 4, 0), 0, 1));
testAppend(type, inputs);
}

@Test(dataProvider = "types")
public void testDictionaryAppendDifferentDictionary(TestType type)
{
List<BlockView> dictionaryInputs = ImmutableList.of(
input(dictionaryBlock(type, 3, 4, 0), 0, 1),
input(dictionaryBlock(type, 2, 4, 0), 0, 1));
testAppend(type, dictionaryInputs);
}

@Test(dataProvider = "types")
public void testDictionarySingleThenFlat(TestType type)
{
BlockView firstInput = input(dictionaryBlock(type, 1, 4, 0), 0);
BlockView secondInput = input(dictionaryBlock(type, 2, 4, 0), 0, 1);
PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type.getType(), 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES);
long initialRetainedSize = positionsAppender.getRetainedSizeInBytes();

firstInput.getPositions().forEach((int position) -> positionsAppender.append(position, firstInput.getBlock()));
positionsAppender.append(secondInput.getPositions(), secondInput.getBlock());

assertBuildResult(type, ImmutableList.of(firstInput, secondInput), positionsAppender, initialRetainedSize);
}

@Test(dataProvider = "types")
public void testConsecutiveBuilds(TestType type)
{
Expand Down Expand Up @@ -262,6 +327,11 @@ public void testConsecutiveBuilds(TestType type)
positionsAppender.append(allPositions(10), nullRleBlock);
assertBlockEquals(type.getType(), positionsAppender.build(), nullRleBlock);

// append dictionary
Block dictionaryBlock = dictionaryBlock(type, 10, 5, 0);
positionsAppender.append(allPositions(10), dictionaryBlock);
assertBlockEquals(type.getType(), positionsAppender.build(), dictionaryBlock);

// just build to confirm appender was reset
assertEquals(positionsAppender.build().getPositionCount(), 0);
}
Expand Down Expand Up @@ -445,6 +515,11 @@ private void testAppendBatch(TestType type, List<BlockView> inputs)
long initialRetainedSize = positionsAppender.getRetainedSizeInBytes();

inputs.forEach(input -> positionsAppender.append(input.getPositions(), input.getBlock()));
assertBuildResult(type, inputs, positionsAppender, initialRetainedSize);
}

private void assertBuildResult(TestType type, List<BlockView> inputs, PositionsAppender positionsAppender, long initialRetainedSize)
{
long sizeInBytes = positionsAppender.getSizeInBytes();
assertGreaterThanOrEqual(positionsAppender.getRetainedSizeInBytes(), sizeInBytes);
Block actual = positionsAppender.build();
Expand Down Expand Up @@ -569,6 +644,7 @@ public void appendTo(PositionsAppender positionsAppender)
private static class TestVariableWidthBlock
extends AbstractVariableWidthBlock
{
private static final int INSTANCE_SIZE = instanceSize(VariableWidthBlock.class);
private final int arrayOffset;
private final int positionCount;
private final Slice slice;
Expand Down Expand Up @@ -694,7 +770,7 @@ public long getPositionsSizeInBytes(boolean[] positions, int selectedPositionsCo
@Override
public long getRetainedSizeInBytes()
{
throw new UnsupportedOperationException();
return INSTANCE_SIZE + slice.getRetainedSize() + sizeOf(valueIsNull) + sizeOf(offsets);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import java.util.Optional;

import static io.trino.spi.block.DictionaryBlock.createProjectedDictionaryBlock;

public class DictionaryBlockEncoding
implements BlockEncoding
{
Expand Down Expand Up @@ -52,11 +50,6 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO

// ids
sliceOutput.writeBytes(dictionaryBlock.getIds());

// instance id
sliceOutput.appendLong(dictionaryBlock.getDictionarySourceId().getMostSignificantBits());
sliceOutput.appendLong(dictionaryBlock.getDictionarySourceId().getLeastSignificantBits());
sliceOutput.appendLong(dictionaryBlock.getDictionarySourceId().getSequenceId());
}

@Override
Expand All @@ -72,15 +65,8 @@ public Block readBlock(BlockEncodingSerde blockEncodingSerde, SliceInput sliceIn
int[] ids = new int[positionCount];
sliceInput.readBytes(Slices.wrappedIntArray(ids));

// instance id
long mostSignificantBits = sliceInput.readLong();
long leastSignificantBits = sliceInput.readLong();
long sequenceId = sliceInput.readLong();

// We always compact the dictionary before we send it. However, dictionaryBlock comes from sliceInput, which may over-retain memory.
// As a result, setting dictionaryIsCompacted to true is not appropriate here.
// TODO: fix DictionaryBlock so that dictionaryIsCompacted can be set to true when the underlying block over-retains memory.
return createProjectedDictionaryBlock(positionCount, dictionaryBlock, ids, new DictionaryId(mostSignificantBits, leastSignificantBits, sequenceId));
// flatten the dictionary
return dictionaryBlock.copyPositions(ids, 0, ids.length);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static io.trino.spi.block.BlockTestUtils.assertBlockEquals;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

public class TestDictionaryBlockEncoding
Expand All @@ -40,13 +39,7 @@ public void testRoundTrip()
DictionaryBlock dictionaryBlock = (DictionaryBlock) DictionaryBlock.create(ids.length, dictionary, ids);

Block actualBlock = roundTripBlock(dictionaryBlock);
assertTrue(actualBlock instanceof DictionaryBlock);
DictionaryBlock actualDictionaryBlock = (DictionaryBlock) actualBlock;
assertBlockEquals(VARCHAR, actualDictionaryBlock.getDictionary(), dictionary);
for (int position = 0; position < actualDictionaryBlock.getPositionCount(); position++) {
assertEquals(actualDictionaryBlock.getId(position), ids[position]);
}
assertEquals(actualDictionaryBlock.getDictionarySourceId(), dictionaryBlock.getDictionarySourceId());
assertBlockEquals(VARCHAR, actualBlock, dictionaryBlock);
}

@Test
Expand All @@ -56,7 +49,6 @@ public void testNonSequentialDictionaryUnnest()
DictionaryBlock dictionaryBlock = (DictionaryBlock) DictionaryBlock.create(ids.length, dictionary, ids);

Block actualBlock = roundTripBlock(dictionaryBlock);
assertTrue(actualBlock instanceof DictionaryBlock);
assertBlockEquals(VARCHAR, actualBlock, dictionary.getPositions(ids, 0, 4));
}

Expand Down