diff --git a/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java index d70145c9c4b6..c56f02b4c754 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java @@ -17,8 +17,13 @@ import io.trino.spi.block.DictionaryBlock; import io.trino.spi.block.RunLengthEncodedBlock; import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntArrays; +import static com.google.common.base.Preconditions.checkArgument; import static io.airlift.slice.SizeOf.instanceSize; +import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; +import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; +import static java.lang.Math.max; import static java.util.Objects.requireNonNull; /** @@ -30,10 +35,12 @@ public class UnnestingPositionsAppender private static final int INSTANCE_SIZE = instanceSize(UnnestingPositionsAppender.class); private final PositionsAppender delegate; + private DictionaryBlockBuilder dictionaryBlockBuilder; public UnnestingPositionsAppender(PositionsAppender delegate) { this.delegate = requireNonNull(delegate, "delegate is null"); + this.dictionaryBlockBuilder = new DictionaryBlockBuilder(); } @Override @@ -43,12 +50,14 @@ public void append(IntArrayList positions, Block source) return; } if (source instanceof RunLengthEncodedBlock) { + dictionaryBlockBuilder.flushDictionary(delegate); delegate.appendRle(((RunLengthEncodedBlock) source).getValue(), positions.size()); } else if (source instanceof DictionaryBlock) { appendDictionary(positions, (DictionaryBlock) source); } else { + dictionaryBlockBuilder.flushDictionary(delegate); delegate.append(positions, source); } } @@ -59,12 +68,15 @@ public void appendRle(Block block, int rlePositionCount) if (rlePositionCount == 0) { return; } + dictionaryBlockBuilder.flushDictionary(delegate); delegate.appendRle(block, rlePositionCount); } @Override public void append(int position, Block source) { + dictionaryBlockBuilder.flushDictionary(delegate); + if (source instanceof RunLengthEncodedBlock runLengthEncodedBlock) { delegate.append(0, runLengthEncodedBlock.getValue()); } @@ -79,13 +91,21 @@ else if (source instanceof DictionaryBlock dictionaryBlock) { @Override public Block build() { - return delegate.build(); + Block result; + if (dictionaryBlockBuilder.isEmpty()) { + result = delegate.build(); + } + else { + result = dictionaryBlockBuilder.build(); + } + dictionaryBlockBuilder = dictionaryBlockBuilder.newBuilderLike(); + return result; } @Override public long getRetainedSizeInBytes() { - return INSTANCE_SIZE + delegate.getRetainedSizeInBytes(); + return INSTANCE_SIZE + delegate.getRetainedSizeInBytes() + dictionaryBlockBuilder.getRetainedSizeInBytes(); } @Override @@ -96,10 +116,18 @@ public long getSizeInBytes() private void appendDictionary(IntArrayList positions, DictionaryBlock source) { - delegate.append(mapPositions(positions, source), source.getDictionary()); + Block dictionary = source.getDictionary(); + IntArrayList dictionaryPositions = getDictionaryPositions(positions, source); + if (dictionaryBlockBuilder.canAppend(dictionary)) { + dictionaryBlockBuilder.append(dictionaryPositions, dictionary); + } + else { + dictionaryBlockBuilder.flushDictionary(delegate); + delegate.append(dictionaryPositions, dictionary); + } } - private IntArrayList mapPositions(IntArrayList positions, DictionaryBlock block) + private IntArrayList getDictionaryPositions(IntArrayList positions, DictionaryBlock block) { int[] positionArray = new int[positions.size()]; for (int i = 0; i < positions.size(); i++) { @@ -107,4 +135,95 @@ private IntArrayList mapPositions(IntArrayList positions, DictionaryBlock block) } return IntArrayList.wrap(positionArray); } + + private static class DictionaryBlockBuilder + { + private static final int INSTANCE_SIZE = instanceSize(DictionaryBlockBuilder.class); + private final int initialEntryCount; + private Block dictionary; + private int[] dictionaryIds; + private int positionCount; + private boolean closed; + + public DictionaryBlockBuilder() + { + this(1024); + } + + public DictionaryBlockBuilder(int initialEntryCount) + { + this.initialEntryCount = initialEntryCount; + this.dictionaryIds = new int[0]; + } + + public boolean isEmpty() + { + return positionCount == 0; + } + + public Block build() + { + return DictionaryBlock.create(positionCount, dictionary, dictionaryIds); + } + + public long getRetainedSizeInBytes() + { + return INSTANCE_SIZE + + (long) dictionaryIds.length * Integer.BYTES + + (dictionary != null ? dictionary.getRetainedSizeInBytes() : 0); + } + + public boolean canAppend(Block dictionary) + { + return !closed && (dictionary == this.dictionary || this.dictionary == null); + } + + public void append(IntArrayList mappedPositions, Block dictionary) + { + checkArgument(canAppend(dictionary)); + this.dictionary = dictionary; + ensureCapacity(positionCount + mappedPositions.size()); + System.arraycopy(mappedPositions.elements(), 0, dictionaryIds, positionCount, mappedPositions.size()); + positionCount += mappedPositions.size(); + } + + public void flushDictionary(PositionsAppender delegate) + { + if (closed) { + return; + } + if (positionCount > 0) { + requireNonNull(dictionary, () -> "dictionary is null but we have pending dictionaryIds " + positionCount); + delegate.append(IntArrayList.wrap(dictionaryIds, positionCount), dictionary); + } + + closed = true; + dictionaryIds = new int[0]; + positionCount = 0; + dictionary = null; + } + + public DictionaryBlockBuilder newBuilderLike() + { + return new DictionaryBlockBuilder(max(calculateBlockResetSize(positionCount), initialEntryCount)); + } + + private void ensureCapacity(int capacity) + { + if (dictionaryIds.length >= capacity) { + return; + } + + int newSize; + if (dictionaryIds.length > 0) { + newSize = calculateNewArraySize(dictionaryIds.length); + } + else { + newSize = initialEntryCount; + } + newSize = Math.max(newSize, capacity); + + dictionaryIds = IntArrays.ensureCapacity(dictionaryIds, newSize, positionCount); + } + } } diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java index 98fd492f3ea1..f74877be330c 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java @@ -57,6 +57,8 @@ import java.util.stream.IntStream; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.instanceSize; +import static io.airlift.slice.SizeOf.sizeOf; import static io.airlift.slice.Slices.EMPTY_SLICE; import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; import static io.airlift.testing.Assertions.assertInstanceOf; @@ -231,6 +233,69 @@ public void testRleAppendedWithSinglePositionDoesNotProduceRle(TestType type) assertFalse(actual instanceof RunLengthEncodedBlock, actual.getClass().getSimpleName()); } + @Test(dataProvider = "types") + public void testMultipleTheSameDictionariesProduceDictionary(TestType type) + { + PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type.getType(), 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); + + testMultipleTheSameDictionariesProduceDictionary(type, positionsAppender); + // test if appender can accept different dictionary after a build + testMultipleTheSameDictionariesProduceDictionary(type, positionsAppender); + } + + private void testMultipleTheSameDictionariesProduceDictionary(TestType type, PositionsAppender positionsAppender) + { + Block dictionary = createRandomBlockForType(type, 4, 0); + positionsAppender.append(allPositions(3), createRandomDictionaryBlock(dictionary, 3)); + positionsAppender.append(allPositions(2), createRandomDictionaryBlock(dictionary, 2)); + + Block actual = positionsAppender.build(); + assertEquals(actual.getPositionCount(), 5); + assertInstanceOf(actual, DictionaryBlock.class); + assertEquals(((DictionaryBlock) actual).getDictionary(), dictionary); + } + + @Test(dataProvider = "types") + public void testDictionarySwitchToFlat(TestType type) + { + List inputs = ImmutableList.of( + input(dictionaryBlock(type, 3, 4, 0), 0, 1), + input(notNullBlock(type, 2), 0, 1)); + testAppend(type, inputs); + } + + @Test(dataProvider = "types") + public void testFlatAppendDictionary(TestType type) + { + List inputs = ImmutableList.of( + input(notNullBlock(type, 2), 0, 1), + input(dictionaryBlock(type, 3, 4, 0), 0, 1)); + testAppend(type, inputs); + } + + @Test(dataProvider = "types") + public void testDictionaryAppendDifferentDictionary(TestType type) + { + List dictionaryInputs = ImmutableList.of( + input(dictionaryBlock(type, 3, 4, 0), 0, 1), + input(dictionaryBlock(type, 2, 4, 0), 0, 1)); + testAppend(type, dictionaryInputs); + } + + @Test(dataProvider = "types") + public void testDictionarySingleThenFlat(TestType type) + { + BlockView firstInput = input(dictionaryBlock(type, 1, 4, 0), 0); + BlockView secondInput = input(dictionaryBlock(type, 2, 4, 0), 0, 1); + PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type.getType(), 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); + long initialRetainedSize = positionsAppender.getRetainedSizeInBytes(); + + firstInput.getPositions().forEach((int position) -> positionsAppender.append(position, firstInput.getBlock())); + positionsAppender.append(secondInput.getPositions(), secondInput.getBlock()); + + assertBuildResult(type, ImmutableList.of(firstInput, secondInput), positionsAppender, initialRetainedSize); + } + @Test(dataProvider = "types") public void testConsecutiveBuilds(TestType type) { @@ -262,6 +327,11 @@ public void testConsecutiveBuilds(TestType type) positionsAppender.append(allPositions(10), nullRleBlock); assertBlockEquals(type.getType(), positionsAppender.build(), nullRleBlock); + // append dictionary + Block dictionaryBlock = dictionaryBlock(type, 10, 5, 0); + positionsAppender.append(allPositions(10), dictionaryBlock); + assertBlockEquals(type.getType(), positionsAppender.build(), dictionaryBlock); + // just build to confirm appender was reset assertEquals(positionsAppender.build().getPositionCount(), 0); } @@ -445,6 +515,11 @@ private void testAppendBatch(TestType type, List inputs) long initialRetainedSize = positionsAppender.getRetainedSizeInBytes(); inputs.forEach(input -> positionsAppender.append(input.getPositions(), input.getBlock())); + assertBuildResult(type, inputs, positionsAppender, initialRetainedSize); + } + + private void assertBuildResult(TestType type, List inputs, PositionsAppender positionsAppender, long initialRetainedSize) + { long sizeInBytes = positionsAppender.getSizeInBytes(); assertGreaterThanOrEqual(positionsAppender.getRetainedSizeInBytes(), sizeInBytes); Block actual = positionsAppender.build(); @@ -569,6 +644,7 @@ public void appendTo(PositionsAppender positionsAppender) private static class TestVariableWidthBlock extends AbstractVariableWidthBlock { + private static final int INSTANCE_SIZE = instanceSize(VariableWidthBlock.class); private final int arrayOffset; private final int positionCount; private final Slice slice; @@ -694,7 +770,7 @@ public long getPositionsSizeInBytes(boolean[] positions, int selectedPositionsCo @Override public long getRetainedSizeInBytes() { - throw new UnsupportedOperationException(); + return INSTANCE_SIZE + slice.getRetainedSize() + sizeOf(valueIsNull) + sizeOf(offsets); } @Override diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlockEncoding.java index 9c806ff92daf..0c63674c3ae2 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlockEncoding.java @@ -19,8 +19,6 @@ import java.util.Optional; -import static io.trino.spi.block.DictionaryBlock.createProjectedDictionaryBlock; - public class DictionaryBlockEncoding implements BlockEncoding { @@ -52,11 +50,6 @@ public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceO // ids sliceOutput.writeBytes(dictionaryBlock.getIds()); - - // instance id - sliceOutput.appendLong(dictionaryBlock.getDictionarySourceId().getMostSignificantBits()); - sliceOutput.appendLong(dictionaryBlock.getDictionarySourceId().getLeastSignificantBits()); - sliceOutput.appendLong(dictionaryBlock.getDictionarySourceId().getSequenceId()); } @Override @@ -72,15 +65,8 @@ public Block readBlock(BlockEncodingSerde blockEncodingSerde, SliceInput sliceIn int[] ids = new int[positionCount]; sliceInput.readBytes(Slices.wrappedIntArray(ids)); - // instance id - long mostSignificantBits = sliceInput.readLong(); - long leastSignificantBits = sliceInput.readLong(); - long sequenceId = sliceInput.readLong(); - - // We always compact the dictionary before we send it. However, dictionaryBlock comes from sliceInput, which may over-retain memory. - // As a result, setting dictionaryIsCompacted to true is not appropriate here. - // TODO: fix DictionaryBlock so that dictionaryIsCompacted can be set to true when the underlying block over-retains memory. - return createProjectedDictionaryBlock(positionCount, dictionaryBlock, ids, new DictionaryId(mostSignificantBits, leastSignificantBits, sequenceId)); + // flatten the dictionary + return dictionaryBlock.copyPositions(ids, 0, ids.length); } @Override diff --git a/core/trino-spi/src/test/java/io/trino/spi/block/TestDictionaryBlockEncoding.java b/core/trino-spi/src/test/java/io/trino/spi/block/TestDictionaryBlockEncoding.java index 772ffed647f0..07fdb0d12eb5 100644 --- a/core/trino-spi/src/test/java/io/trino/spi/block/TestDictionaryBlockEncoding.java +++ b/core/trino-spi/src/test/java/io/trino/spi/block/TestDictionaryBlockEncoding.java @@ -18,7 +18,6 @@ import static io.trino.spi.block.BlockTestUtils.assertBlockEquals; import static io.trino.spi.type.VarcharType.VARCHAR; -import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; public class TestDictionaryBlockEncoding @@ -40,13 +39,7 @@ public void testRoundTrip() DictionaryBlock dictionaryBlock = (DictionaryBlock) DictionaryBlock.create(ids.length, dictionary, ids); Block actualBlock = roundTripBlock(dictionaryBlock); - assertTrue(actualBlock instanceof DictionaryBlock); - DictionaryBlock actualDictionaryBlock = (DictionaryBlock) actualBlock; - assertBlockEquals(VARCHAR, actualDictionaryBlock.getDictionary(), dictionary); - for (int position = 0; position < actualDictionaryBlock.getPositionCount(); position++) { - assertEquals(actualDictionaryBlock.getId(position), ids[position]); - } - assertEquals(actualDictionaryBlock.getDictionarySourceId(), dictionaryBlock.getDictionarySourceId()); + assertBlockEquals(VARCHAR, actualBlock, dictionaryBlock); } @Test @@ -56,7 +49,6 @@ public void testNonSequentialDictionaryUnnest() DictionaryBlock dictionaryBlock = (DictionaryBlock) DictionaryBlock.create(ids.length, dictionary, ids); Block actualBlock = roundTripBlock(dictionaryBlock); - assertTrue(actualBlock instanceof DictionaryBlock); assertBlockEquals(VARCHAR, actualBlock, dictionary.getPositions(ids, 0, 4)); }