diff --git a/core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java index c22ffca477eb..98cfda83d179 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java @@ -13,6 +13,7 @@ */ package io.trino.operator.output; +import com.google.common.annotations.VisibleForTesting; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.trino.spi.block.Block; @@ -80,38 +81,43 @@ public void append(IntArrayList positions, Block block) return; } ensurePositionCapacity(positionCount + positions.size()); - int[] positionArray = positions.elements(); - int newByteCount = 0; - int[] lengths = new int[positions.size()]; - - if (block.mayHaveNull()) { - for (int i = 0; i < positions.size(); i++) { - int position = positionArray[i]; - if (block.isNull(position)) { - offsets[positionCount + i + 1] = offsets[positionCount + i]; - valueIsNull[positionCount + i] = true; - hasNullValue = true; + if (block instanceof VariableWidthBlock) { + VariableWidthBlock variableWidthBlock = (VariableWidthBlock) block; + int newByteCount = 0; + int[] lengths = new int[positions.size()]; + int[] sourceOffsets = new int[positions.size()]; + int[] positionArray = positions.elements(); + + if (block.mayHaveNull()) { + for (int i = 0; i < positions.size(); i++) { + int position = positionArray[i]; + int length = variableWidthBlock.getSliceLength(position); + lengths[i] = length; + sourceOffsets[i] = variableWidthBlock.getRawSliceOffset(position); + newByteCount += length; + boolean isNull = block.isNull(position); + valueIsNull[positionCount + i] = isNull; + offsets[positionCount + i + 1] = offsets[positionCount + i] + length; + hasNullValue |= isNull; + hasNonNullValue |= !isNull; } - else { - int length = block.getSliceLength(position); + } + else { + for (int i = 0; i < positions.size(); i++) { + int position = positionArray[i]; + int length = variableWidthBlock.getSliceLength(position); lengths[i] = length; + sourceOffsets[i] = variableWidthBlock.getRawSliceOffset(position); newByteCount += length; offsets[positionCount + i + 1] = offsets[positionCount + i] + length; - hasNonNullValue = true; } + hasNonNullValue = true; } + copyBytes(variableWidthBlock.getRawSlice(), lengths, sourceOffsets, positions.size(), newByteCount); } else { - for (int i = 0; i < positions.size(); i++) { - int position = positionArray[i]; - int length = block.getSliceLength(position); - lengths[i] = length; - newByteCount += length; - offsets[positionCount + i + 1] = offsets[positionCount + i] + length; - } - hasNonNullValue = true; + appendGenericBlock(positions, block); } - copyBytes(block, lengths, positionArray, positions.size(), offsets, positionCount, newByteCount); } @Override @@ -132,7 +138,7 @@ public void appendRle(RunLengthEncodedBlock block) } else { hasNonNullValue = true; - duplicateBytes(block.getValue(), 0, rlePositionCount); + duplicateBytes(block.getSlice(0, 0, block.getSliceLength(0)), rlePositionCount); } } @@ -166,16 +172,20 @@ public long getSizeInBytes() return sizeInBytes; } - private void copyBytes(Block block, int[] lengths, int[] positions, int count, int[] targetOffsets, int targetOffsetsIndex, int newByteCount) + private void copyBytes(Slice rawSlice, int[] lengths, int[] sourceOffsets, int count, int newByteCount) { - ensureBytesCapacity(getCurrentOffset() + newByteCount); + ensureExtraBytesCapacity(newByteCount); - for (int i = 0; i < count; i++) { - int position = positions[i]; - if (!block.isNull(position)) { - int length = lengths[i]; - Slice slice = block.getSlice(position, 0, length); - slice.getBytes(0, bytes, targetOffsets[targetOffsetsIndex + i], length); + if (rawSlice.hasByteArray()) { + byte[] base = rawSlice.byteArray(); + int byteArrayOffset = rawSlice.byteArrayOffset(); + for (int i = 0; i < count; i++) { + System.arraycopy(base, byteArrayOffset + sourceOffsets[i], bytes, offsets[positionCount + i], lengths[i]); + } + } + else { + for (int i = 0; i < count; i++) { + rawSlice.getBytes(sourceOffsets[i], bytes, offsets[positionCount + i], lengths[i]); } } @@ -184,25 +194,75 @@ private void copyBytes(Block block, int[] lengths, int[] positions, int count, i } /** - * Copy {@code length} bytes from {@code block}, at position {@code position} to {@code count} consecutive positions in the {@link #bytes} array. + * Copy all bytes from {@code slice} to {@code count} consecutive positions in the {@link #bytes} array. */ - private void duplicateBytes(Block block, int position, int count) + private void duplicateBytes(Slice slice, int count) { - int length = block.getSliceLength(position); + int length = slice.length(); int newByteCount = toIntExact((long) count * length); int startOffset = getCurrentOffset(); - ensureBytesCapacity(startOffset + newByteCount); + ensureExtraBytesCapacity(newByteCount); + + duplicateBytes(slice, bytes, startOffset, count); - Slice slice = block.getSlice(position, 0, length); + int currentStartOffset = startOffset + length; for (int i = 0; i < count; i++) { - slice.getBytes(0, bytes, startOffset + (i * length), length); - offsets[positionCount + i + 1] = startOffset + ((i + 1) * length); + offsets[positionCount + i + 1] = currentStartOffset; + currentStartOffset += length; } positionCount += count; updateSize(count, newByteCount); } + /** + * Copy {@code length} bytes from {@code slice}, starting at offset {@code sourceOffset} to {@code count} consecutive positions in the {@link #bytes} array. + */ + @VisibleForTesting + static void duplicateBytes(Slice slice, byte[] bytes, int startOffset, int count) + { + int length = slice.length(); + if (length == 0) { + // nothing to copy + return; + } + // copy slice to the first position + slice.getBytes(0, bytes, startOffset, length); + int totalDuplicatedBytes = count * length; + int duplicatedBytes = length; + // copy every byte copied so far, doubling the number of bytes copied on evey iteration + while (duplicatedBytes * 2 <= totalDuplicatedBytes) { + System.arraycopy(bytes, startOffset, bytes, startOffset + duplicatedBytes, duplicatedBytes); + duplicatedBytes = duplicatedBytes * 2; + } + // copy the leftover + System.arraycopy(bytes, startOffset, bytes, startOffset + duplicatedBytes, totalDuplicatedBytes - duplicatedBytes); + } + + private void appendGenericBlock(IntArrayList positions, Block block) + { + int newByteCount = 0; + for (int i = 0; i < positions.size(); i++) { + int position = positions.getInt(i); + if (block.isNull(position)) { + offsets[positionCount + 1] = offsets[positionCount]; + valueIsNull[positionCount] = true; + hasNullValue = true; + } + else { + int length = block.getSliceLength(position); + ensureExtraBytesCapacity(length); + Slice slice = block.getSlice(position, 0, length); + slice.getBytes(0, bytes, offsets[positionCount], length); + offsets[positionCount + 1] = offsets[positionCount] + length; + hasNonNullValue = true; + newByteCount += length; + } + positionCount++; + } + updateSize(positions.size(), newByteCount); + } + private void reset() { initialEntryCount = calculateBlockResetSize(positionCount); @@ -228,12 +288,13 @@ private void updateSize(long positionsSize, int bytesWritten) sizeInBytes += (SIZE_OF_BYTE + SIZE_OF_INT) * positionsSize + bytesWritten; } - private void ensureBytesCapacity(int bytesCapacity) + private void ensureExtraBytesCapacity(int extraBytesCapacity) { - if (bytes.length < bytesCapacity) { + int totalBytesCapacity = getCurrentOffset() + extraBytesCapacity; + if (bytes.length < totalBytesCapacity) { int newBytesLength = Math.max(bytes.length, initialBytesSize); - if (bytesCapacity > newBytesLength) { - newBytesLength = Math.max(bytesCapacity, calculateNewArraySize(newBytesLength)); + if (totalBytesCapacity > newBytesLength) { + newBytesLength = Math.max(totalBytesCapacity, calculateNewArraySize(newBytesLength)); } bytes = Arrays.copyOf(bytes, newBytesLength); updateRetainedSize(); diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java index 7e0f6bba624e..183212d586d2 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java @@ -14,7 +14,10 @@ package io.trino.operator.output; import com.google.common.collect.ImmutableList; +import io.airlift.slice.Slice; import io.airlift.slice.Slices; +import io.trino.block.BlockAssertions; +import io.trino.spi.block.AbstractVariableWidthBlock; import io.trino.spi.block.Block; import io.trino.spi.block.BlockBuilder; import io.trino.spi.block.BlockBuilderStatus; @@ -22,20 +25,37 @@ import io.trino.spi.block.PageBuilderStatus; import io.trino.spi.block.RowBlock; import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.block.VariableWidthBlock; import io.trino.spi.type.ArrayType; +import io.trino.spi.type.BigintType; +import io.trino.spi.type.BooleanType; import io.trino.spi.type.Decimals; +import io.trino.spi.type.DoubleType; +import io.trino.spi.type.IntegerType; import io.trino.spi.type.LongTimestamp; import io.trino.spi.type.RowType; +import io.trino.spi.type.SmallintType; +import io.trino.spi.type.TinyintType; import io.trino.spi.type.Type; +import io.trino.spi.type.VarbinaryType; +import io.trino.spi.type.VarcharType; import io.trino.type.BlockTypeOperators; import it.unimi.dsi.fastutil.ints.IntArrayList; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import javax.annotation.Nullable; + +import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.OptionalInt; +import java.util.function.Function; +import java.util.function.ObjLongConsumer; import java.util.stream.IntStream; +import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.Slices.EMPTY_SLICE; import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; import static io.airlift.testing.Assertions.assertInstanceOf; import static io.trino.block.BlockAssertions.assertBlockEquals; @@ -46,7 +66,6 @@ import static io.trino.block.BlockAssertions.createLongDecimalsBlock; import static io.trino.block.BlockAssertions.createLongTimestampBlock; import static io.trino.block.BlockAssertions.createLongsBlock; -import static io.trino.block.BlockAssertions.createRandomBlockForType; import static io.trino.block.BlockAssertions.createRandomDictionaryBlock; import static io.trino.block.BlockAssertions.createSlicesBlock; import static io.trino.block.BlockAssertions.createSmallintsBlock; @@ -55,16 +74,10 @@ import static io.trino.spi.block.DictionaryId.randomDictionaryId; import static io.trino.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES; import static io.trino.spi.type.BigintType.BIGINT; -import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.CharType.createCharType; import static io.trino.spi.type.DecimalType.createDecimalType; -import static io.trino.spi.type.DoubleType.DOUBLE; -import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.RowType.anonymousRow; -import static io.trino.spi.type.SmallintType.SMALLINT; import static io.trino.spi.type.TimestampType.createTimestampType; -import static io.trino.spi.type.TinyintType.TINYINT; -import static io.trino.spi.type.VarbinaryType.VARBINARY; import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; import static java.util.Objects.requireNonNull; @@ -76,7 +89,7 @@ public class TestPositionsAppender private static final PositionsAppenderFactory POSITIONS_APPENDER_FACTORY = new PositionsAppenderFactory(new BlockTypeOperators()); @Test(dataProvider = "types") - public void testMixedBlockTypes(Type type) + public void testMixedBlockTypes(TestType type) { List input = ImmutableList.of( input(emptyBlock(type)), @@ -103,16 +116,16 @@ public void testMixedBlockTypes(Type type) testAppend(type, input); } - @Test(dataProvider = "nullRleTypes") - public void testNullRle(Type type) + @Test(dataProvider = "types") + public void testNullRle(TestType type) { - testNullRle(type, nullBlock(type, 2)); - testNullRle(type, nullRleBlock(type, 2)); - testNullRle(type, createRandomBlockForType(type, 4, 0.5f)); + testNullRle(type.getType(), nullBlock(type, 2)); + testNullRle(type.getType(), nullRleBlock(type, 2)); + testNullRle(type.getType(), createRandomBlockForType(type, 4, 0.5f)); } @Test(dataProvider = "types") - public void testRleSwitchToFlat(Type type) + public void testRleSwitchToFlat(TestType type) { List inputs = ImmutableList.of( input(rleBlock(type, 3), 0, 1), @@ -126,7 +139,7 @@ public void testRleSwitchToFlat(Type type) } @Test(dataProvider = "types") - public void testFlatAppendRle(Type type) + public void testFlatAppendRle(TestType type) { List inputs = ImmutableList.of( input(notNullBlock(type, 2), 0, 1), @@ -140,7 +153,7 @@ public void testFlatAppendRle(Type type) } @Test(dataProvider = "differentValues") - public void testMultipleRleBlocksWithDifferentValues(Type type, Block value1, Block value2) + public void testMultipleRleBlocksWithDifferentValues(TestType type, Block value1, Block value2) { List input = ImmutableList.of( input(rleBlock(value1, 3), 0, 1), @@ -153,28 +166,27 @@ public static Object[][] differentValues() { return new Object[][] { - {BIGINT, createLongsBlock(0), createLongsBlock(1)}, - {BOOLEAN, createBooleansBlock(true), createBooleansBlock(false)}, - {INTEGER, createIntsBlock(0), createIntsBlock(1)}, - {createCharType(10), createStringsBlock("0"), createStringsBlock("1")}, - {createUnboundedVarcharType(), createStringsBlock("0"), createStringsBlock("1")}, - {DOUBLE, createDoublesBlock(0D), createDoublesBlock(1D)}, - {SMALLINT, createSmallintsBlock(0), createSmallintsBlock(1)}, - {TINYINT, createTinyintsBlock(0), createTinyintsBlock(1)}, - {VARBINARY, createSlicesBlock(Slices.wrappedLongArray(0)), createSlicesBlock(Slices.wrappedLongArray(1))}, - {createDecimalType(Decimals.MAX_SHORT_PRECISION + 1), createLongDecimalsBlock("0"), createLongDecimalsBlock("1")}, - {new ArrayType(BIGINT), createArrayBigintBlock(ImmutableList.of(ImmutableList.of(0L))), createArrayBigintBlock(ImmutableList.of(ImmutableList.of(1L)))}, - { - createTimestampType(9), - createLongTimestampBlock(createTimestampType(9), new LongTimestamp(0, 0)), - createLongTimestampBlock(createTimestampType(9), new LongTimestamp(1, 0))} + {TestType.BIGINT, createLongsBlock(0), createLongsBlock(1)}, + {TestType.BOOLEAN, createBooleansBlock(true), createBooleansBlock(false)}, + {TestType.INTEGER, createIntsBlock(0), createIntsBlock(1)}, + {TestType.CHAR_10, createStringsBlock("0"), createStringsBlock("1")}, + {TestType.VARCHAR, createStringsBlock("0"), createStringsBlock("1")}, + {TestType.DOUBLE, createDoublesBlock(0D), createDoublesBlock(1D)}, + {TestType.SMALLINT, createSmallintsBlock(0), createSmallintsBlock(1)}, + {TestType.TINYINT, createTinyintsBlock(0), createTinyintsBlock(1)}, + {TestType.VARBINARY, createSlicesBlock(Slices.wrappedLongArray(0)), createSlicesBlock(Slices.wrappedLongArray(1))}, + {TestType.LONG_DECIMAL, createLongDecimalsBlock("0"), createLongDecimalsBlock("1")}, + {TestType.ARRAY_BIGINT, createArrayBigintBlock(ImmutableList.of(ImmutableList.of(0L))), createArrayBigintBlock(ImmutableList.of(ImmutableList.of(1L)))}, + {TestType.LONG_TIMESTAMP, createLongTimestampBlock(createTimestampType(9), new LongTimestamp(0, 0)), + createLongTimestampBlock(createTimestampType(9), new LongTimestamp(1, 0))}, + {TestType.VARCHAR_WITH_TEST_BLOCK, TestVariableWidthBlock.adapt(createStringsBlock("0")), TestVariableWidthBlock.adapt(createStringsBlock("1"))} }; } @Test(dataProvider = "types") - public void testMultipleRleWithTheSameValueProduceRle(Type type) + public void testMultipleRleWithTheSameValueProduceRle(TestType type) { - PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type, 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); + PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type.getType(), 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); Block value = notNullBlock(type, 1); positionsAppender.append(allPositions(3), rleBlock(value, 3)); @@ -186,9 +198,9 @@ public void testMultipleRleWithTheSameValueProduceRle(Type type) } @Test(dataProvider = "types") - public void testConsecutiveBuilds(Type type) + public void testConsecutiveBuilds(TestType type) { - PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type, 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); + PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type.getType(), 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); // empty block positionsAppender.append(positions(), emptyBlock(type)); @@ -204,12 +216,12 @@ public void testConsecutiveBuilds(Type type) // append null and not null position positionsAppender.append(allPositions(2), block); - assertBlockEquals(type, positionsAppender.build(), block); + assertBlockEquals(type.getType(), positionsAppender.build(), block); // append not null rle Block rleBlock = rleBlock(type, 1); positionsAppender.append(allPositions(1), rleBlock); - assertBlockEquals(type, positionsAppender.build(), rleBlock); + assertBlockEquals(type.getType(), positionsAppender.build(), rleBlock); // append empty rle positionsAppender.append(positions(), rleBlock(type, 0)); @@ -218,7 +230,7 @@ public void testConsecutiveBuilds(Type type) // append null rle Block nullRleBlock = nullRleBlock(type, 1); positionsAppender.append(allPositions(1), nullRleBlock); - assertBlockEquals(type, positionsAppender.build(), nullRleBlock); + assertBlockEquals(type.getType(), positionsAppender.build(), nullRleBlock); // just build to confirm appender was reset assertEquals(positionsAppender.build().getPositionCount(), 0); @@ -246,9 +258,9 @@ public void testRowWithNestedFields() { RowType type = anonymousRow(BIGINT, BIGINT, VARCHAR); Block rowBLock = RowBlock.fromFieldBlocks(2, Optional.empty(), new Block[] { - notNullBlock(BIGINT, 2), - dictionaryBlock(BIGINT, 2, 2, 0.5F), - rleBlock(VARCHAR, 2) + notNullBlock(TestType.BIGINT, 2), + dictionaryBlock(TestType.BIGINT, 2, 2, 0.5F), + rleBlock(TestType.VARCHAR, 2) }); PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type, 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); @@ -259,45 +271,12 @@ public void testRowWithNestedFields() assertBlockEquals(type, actual, rowBLock); } - @DataProvider(name = "nullRleTypes") - public static Object[][] nullRleTypes() - { - return new Object[][] - { - {BIGINT}, - {BOOLEAN}, - {INTEGER}, - {createCharType(10)}, - {createUnboundedVarcharType()}, - {DOUBLE}, - {SMALLINT}, - {TINYINT}, - {VARBINARY}, - {createDecimalType(Decimals.MAX_SHORT_PRECISION + 1)}, - {createTimestampType(9)}, - {anonymousRow(BIGINT, VARCHAR)} - }; - } - @DataProvider(name = "types") public static Object[][] types() { - return new Object[][] - { - {BIGINT}, - {BOOLEAN}, - {INTEGER}, - {createCharType(10)}, - {createUnboundedVarcharType()}, - {DOUBLE}, - {SMALLINT}, - {TINYINT}, - {VARBINARY}, - {createDecimalType(Decimals.MAX_SHORT_PRECISION + 1)}, - {new ArrayType(BIGINT)}, - {createTimestampType(9)}, - {anonymousRow(BIGINT, VARCHAR)} - }; + return Arrays.stream(TestType.values()) + .map(type -> new Object[] {type}) + .toArray(Object[][]::new); } private static Block singleValueBlock(String value) @@ -332,7 +311,7 @@ private DictionaryBlock dictionaryBlock(Block dictionary, int[] ids) return new DictionaryBlock(0, ids.length, dictionary, ids, false, randomDictionaryId()); } - private DictionaryBlock dictionaryBlock(Type type, int positionCount, int dictionarySize, float nullRate) + private DictionaryBlock dictionaryBlock(TestType type, int positionCount, int dictionarySize, float nullRate) { Block dictionary = createRandomBlockForType(type, dictionarySize, nullRate); return createRandomDictionaryBlock(dictionary, positionCount); @@ -343,40 +322,45 @@ private RunLengthEncodedBlock rleBlock(Block value, int positionCount) return new RunLengthEncodedBlock(value, positionCount); } - private RunLengthEncodedBlock rleBlock(Type type, int positionCount) + private RunLengthEncodedBlock rleBlock(TestType type, int positionCount) { Block rleValue = createRandomBlockForType(type, 1, 0); return new RunLengthEncodedBlock(rleValue, positionCount); } - private RunLengthEncodedBlock nullRleBlock(Type type, int positionCount) + private RunLengthEncodedBlock nullRleBlock(TestType type, int positionCount) { Block rleValue = nullBlock(type, 1); return new RunLengthEncodedBlock(rleValue, positionCount); } - private Block partiallyNullBlock(Type type, int positionCount) + private Block partiallyNullBlock(TestType type, int positionCount) { return createRandomBlockForType(type, positionCount, 0.5F); } - private Block notNullBlock(Type type, int positionCount) + private Block notNullBlock(TestType type, int positionCount) { return createRandomBlockForType(type, positionCount, 0); } - private Block nullBlock(Type type, int positionCount) + private Block nullBlock(TestType type, int positionCount) { - BlockBuilder blockBuilder = type.createBlockBuilder(null, positionCount); + BlockBuilder blockBuilder = type.getType().createBlockBuilder(null, positionCount); for (int i = 0; i < positionCount; i++) { blockBuilder.appendNull(); } - return blockBuilder.build(); + return type.adapt(blockBuilder.build()); + } + + private Block emptyBlock(TestType type) + { + return type.adapt(type.getType().createBlockBuilder(null, 0).build()); } - private Block emptyBlock(Type type) + private Block createRandomBlockForType(TestType type, int positionCount, float nullRate) { - return type.createBlockBuilder(null, 0).build(); + return type.adapt(BlockAssertions.createRandomBlockForType(type.getType(), positionCount, nullRate)); } private void testNullRle(Type type, Block source) @@ -398,9 +382,9 @@ private void testNullRle(Type type, Block source) assertInstanceOf(actual, RunLengthEncodedBlock.class); } - private void testAppend(Type type, List inputs) + private void testAppend(TestType type, List inputs) { - PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type, 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); + PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type.getType(), 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); long initialRetainedSize = positionsAppender.getRetainedSizeInBytes(); inputs.forEach(input -> positionsAppender.append(input.getPositions(), input.getBlock())); @@ -408,7 +392,7 @@ private void testAppend(Type type, List inputs) assertGreaterThanOrEqual(positionsAppender.getRetainedSizeInBytes(), sizeInBytes); Block actual = positionsAppender.build(); - assertBlockIsValid(actual, sizeInBytes, type, inputs); + assertBlockIsValid(actual, sizeInBytes, type.getType(), inputs); // verify positionsAppender reset assertEquals(positionsAppender.getSizeInBytes(), 0); assertEquals(positionsAppender.getRetainedSizeInBytes(), initialRetainedSize); @@ -437,6 +421,48 @@ private Block buildBlock(Type type, List inputs, BlockBuilderStatus b return blockBuilder.build(); } + private enum TestType + { + BIGINT(BigintType.BIGINT), + BOOLEAN(BooleanType.BOOLEAN), + INTEGER(IntegerType.INTEGER), + CHAR_10(createCharType(10)), + VARCHAR(createUnboundedVarcharType()), + DOUBLE(DoubleType.DOUBLE), + SMALLINT(SmallintType.SMALLINT), + TINYINT(TinyintType.TINYINT), + VARBINARY(VarbinaryType.VARBINARY), + LONG_DECIMAL(createDecimalType(Decimals.MAX_SHORT_PRECISION + 1)), + LONG_TIMESTAMP(createTimestampType(9)), + ROW_BIGINT_VARCHAR(anonymousRow(BigintType.BIGINT, VarcharType.VARCHAR)), + ARRAY_BIGINT(new ArrayType(BigintType.BIGINT)), + VARCHAR_WITH_TEST_BLOCK(VarcharType.VARCHAR, TestVariableWidthBlock.adaptation()); + + private final Type type; + private final Function blockAdaptation; + + TestType(Type type) + { + this(type, Function.identity()); + } + + TestType(Type type, Function blockAdaptation) + { + this.type = requireNonNull(type, "type is null"); + this.blockAdaptation = requireNonNull(blockAdaptation, "blockAdaptation is null"); + } + + public Block adapt(Block block) + { + return blockAdaptation.apply(block); + } + + public Type getType() + { + return type; + } + } + private static class BlockView { private final Block block; @@ -463,4 +489,160 @@ public void appendTo(PositionsAppender positionsAppender) positionsAppender.append(getPositions(), getBlock()); } } + + private static class TestVariableWidthBlock + extends AbstractVariableWidthBlock + { + private final int arrayOffset; + private final int positionCount; + private final Slice slice; + private final int[] offsets; + @Nullable + private final boolean[] valueIsNull; + + private static Function adaptation() + { + return TestVariableWidthBlock::adapt; + } + + private static Block adapt(Block block) + { + if (block instanceof RunLengthEncodedBlock) { + checkArgument(block.getPositionCount() == 0 || block.isNull(0)); + return new RunLengthEncodedBlock(new TestVariableWidthBlock(0, 1, EMPTY_SLICE, new int[] {0, 0}, new boolean[] {true}), block.getPositionCount()); + } + + int[] offsets = new int[block.getPositionCount() + 1]; + boolean[] valueIsNull = new boolean[block.getPositionCount()]; + boolean hasNullValue = false; + for (int i = 0; i < block.getPositionCount(); i++) { + if (block.isNull(i)) { + valueIsNull[i] = true; + hasNullValue = true; + offsets[i + 1] = offsets[i]; + } + else { + offsets[i + 1] = offsets[i] + block.getSliceLength(i); + } + } + + return new TestVariableWidthBlock(0, block.getPositionCount(), ((VariableWidthBlock) block).getRawSlice(), offsets, hasNullValue ? valueIsNull : null); + } + + private TestVariableWidthBlock(int arrayOffset, int positionCount, Slice slice, int[] offsets, boolean[] valueIsNull) + { + checkArgument(arrayOffset >= 0); + this.arrayOffset = arrayOffset; + checkArgument(positionCount >= 0); + this.positionCount = positionCount; + this.slice = requireNonNull(slice, "slice is null"); + this.offsets = offsets; + this.valueIsNull = valueIsNull; + } + + @Override + protected Slice getRawSlice(int position) + { + return slice; + } + + @Override + protected int getPositionOffset(int position) + { + return offsets[position + arrayOffset]; + } + + @Override + public int getSliceLength(int position) + { + return getPositionOffset(position + 1) - getPositionOffset(position); + } + + @Override + protected boolean isEntryNull(int position) + { + return valueIsNull != null && valueIsNull[position + arrayOffset]; + } + + @Override + public int getPositionCount() + { + return positionCount; + } + + @Override + public Block getRegion(int positionOffset, int length) + { + return new TestVariableWidthBlock(positionOffset + arrayOffset, length, slice, offsets, valueIsNull); + } + + @Override + public Block getSingleValueBlock(int position) + { + if (isNull(position)) { + return new TestVariableWidthBlock(0, 1, EMPTY_SLICE, new int[] {0, 0}, new boolean[] {true}); + } + + int offset = getPositionOffset(position); + int entrySize = getSliceLength(position); + + Slice copy = Slices.copyOf(getRawSlice(position), offset, entrySize); + + return new TestVariableWidthBlock(0, 1, copy, new int[] {0, copy.length()}, null); + } + + @Override + public long getSizeInBytes() + { + throw new UnsupportedOperationException(); + } + + @Override + public long getRegionSizeInBytes(int position, int length) + { + throw new UnsupportedOperationException(); + } + + @Override + public OptionalInt fixedSizeInBytesPerPosition() + { + return OptionalInt.empty(); + } + + @Override + public long getPositionsSizeInBytes(boolean[] positions, int selectedPositionsCount) + { + throw new UnsupportedOperationException(); + } + + @Override + public long getRetainedSizeInBytes() + { + throw new UnsupportedOperationException(); + } + + @Override + public void retainedBytesForEachPart(ObjLongConsumer consumer) + { + throw new UnsupportedOperationException(); + } + + @Override + public Block copyPositions(int[] positions, int offset, int length) + { + throw new UnsupportedOperationException(); + } + + @Override + public Block copyRegion(int position, int length) + { + throw new UnsupportedOperationException(); + } + + @Override + public Block copyWithAppendedNull() + { + throw new UnsupportedOperationException(); + } + } } diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestSlicePositionsAppender.java b/core/trino-main/src/test/java/io/trino/operator/output/TestSlicePositionsAppender.java new file mode 100644 index 000000000000..a102a75667c8 --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestSlicePositionsAppender.java @@ -0,0 +1,144 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import io.trino.spi.block.Block; +import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.block.VariableWidthBlock; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.Optional; + +import static io.trino.block.BlockAssertions.assertBlockEquals; +import static io.trino.block.BlockAssertions.createStringsBlock; +import static io.trino.operator.output.SlicePositionsAppender.duplicateBytes; +import static io.trino.spi.block.PageBuilderStatus.DEFAULT_MAX_PAGE_SIZE_IN_BYTES; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals; + +public class TestSlicePositionsAppender +{ + @Test + public void testAppendEmptySliceRle() + { + // test SlicePositionAppender.appendRle with empty value (Slice with length 0) + PositionsAppender positionsAppender = new SlicePositionsAppender(1, 100); + RunLengthEncodedBlock rleBlock = new RunLengthEncodedBlock(createStringsBlock(""), 10); + positionsAppender.appendRle(rleBlock); + + Block actualBlock = positionsAppender.build(); + + assertBlockEquals(VARCHAR, actualBlock, rleBlock); + } + + // test append with VariableWidthBlock using Slice not backed by byte array + // to test special handling in SlicePositionsAppender.copyBytes + @Test + public void testAppendSliceNotBackedByByteArray() + { + PositionsAppender positionsAppender = new SlicePositionsAppender(1, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); + Block block = new VariableWidthBlock(3, Slices.wrappedLongArray(257, 2), new int[] {0, 1, Long.BYTES, 2 * Long.BYTES}, Optional.empty()); + positionsAppender.append(IntArrayList.wrap(new int[] {0, 2}), block); + + Block actual = positionsAppender.build(); + + Block expected = new VariableWidthBlock( + 2, + Slices.wrappedBuffer(new byte[] {1, 2, 0, 0, 0, 0, 0, 0, 0}), + new int[] {0, 1, Long.BYTES + 1}, + Optional.empty()); + assertBlockEquals(VARCHAR, actual, expected); + } + + @Test + public void testDuplicateZeroLength() + { + Slice slice = Slices.wrappedBuffer(); + byte[] target = new byte[] {-1}; + duplicateBytes(slice, target, 0, 100); + assertArrayEquals(new byte[] {-1}, target); + } + + @Test + public void testDuplicate1Byte() + { + Slice slice = Slices.wrappedBuffer(new byte[] {2}); + byte[] target = new byte[5]; + Arrays.fill(target, (byte) -1); + duplicateBytes(slice, target, 3, 2); + assertArrayEquals(new byte[] {-1, -1, -1, 2, 2}, target); + } + + @Test + public void testDuplicate2Bytes() + { + Slice slice = Slices.wrappedBuffer(new byte[] {1, 2}); + byte[] target = new byte[8]; + Arrays.fill(target, (byte) -1); + duplicateBytes(slice, target, 1, 3); + assertArrayEquals(new byte[] {-1, 1, 2, 1, 2, 1, 2, -1}, target); + } + + @Test + public void testDuplicate1Time() + { + Slice slice = Slices.wrappedBuffer(new byte[] {1, 2}); + byte[] target = new byte[8]; + Arrays.fill(target, (byte) -1); + + duplicateBytes(slice, target, 1, 1); + + assertArrayEquals(new byte[] {-1, 1, 2, -1, -1, -1, -1, -1}, target); + } + + @Test + public void testDuplicateMultipleBytesOffNumberOfTimes() + { + Slice slice = Slices.wrappedBuffer(new byte[] {5, 3, 1}); + byte[] target = new byte[17]; + Arrays.fill(target, (byte) -1); + + duplicateBytes(slice, target, 1, 5); + + assertArrayEquals(new byte[] {-1, 5, 3, 1, 5, 3, 1, 5, 3, 1, 5, 3, 1, 5, 3, 1, -1}, target); + } + + @Test + public void testDuplicateMultipleBytesEvenNumberOfTimes() + { + Slice slice = Slices.wrappedBuffer(new byte[] {5, 3, 1}); + byte[] target = new byte[20]; + Arrays.fill(target, (byte) -1); + + duplicateBytes(slice, target, 1, 6); + + assertArrayEquals(new byte[] {-1, 5, 3, 1, 5, 3, 1, 5, 3, 1, 5, 3, 1, 5, 3, 1, 5, 3, 1, -1}, target); + } + + @Test + public void testDuplicateMultipleBytesPowerOfTwoNumberOfTimes() + { + Slice slice = Slices.wrappedBuffer(new byte[] {5, 3, 1}); + byte[] target = new byte[14]; + Arrays.fill(target, (byte) -1); + + duplicateBytes(slice, target, 1, 4); + + assertArrayEquals(new byte[] {-1, 5, 3, 1, 5, 3, 1, 5, 3, 1, 5, 3, 1, -1}, target); + } +} diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml index f8d8cc09cd55..b228d848640e 100644 --- a/core/trino-spi/pom.xml +++ b/core/trino-spi/pom.xml @@ -193,6 +193,10 @@ java.field.removed field io.trino.spi.expression.StandardFunctions.LIKE_PATTERN_FUNCTION_NAME + + java.method.returnTypeChanged + method io.trino.spi.block.Block io.trino.spi.block.ArrayBlockBuilder::build() + diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/ArrayBlockBuilder.java b/core/trino-spi/src/main/java/io/trino/spi/block/ArrayBlockBuilder.java index 1748121fa8fa..fc67f31ac585 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/ArrayBlockBuilder.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/ArrayBlockBuilder.java @@ -23,6 +23,8 @@ import static io.airlift.slice.SizeOf.sizeOf; import static io.trino.spi.block.ArrayBlock.createArrayBlockInternal; +import static io.trino.spi.block.BlockUtil.checkArrayRange; +import static io.trino.spi.block.BlockUtil.checkValidRegion; import static java.lang.Math.max; import static java.util.Objects.requireNonNull; @@ -42,6 +44,7 @@ public class ArrayBlockBuilder private int[] offsets = new int[1]; private boolean[] valueIsNull = new boolean[0]; private boolean hasNullValue; + private boolean hasNonNullRow; private final BlockBuilder values; private boolean currentEntryOpened; @@ -186,6 +189,7 @@ private void entryAdded(boolean isNull) offsets[positionCount + 1] = values.getPositionCount(); valueIsNull[positionCount] = isNull; hasNullValue |= isNull; + hasNonNullRow |= !isNull; positionCount++; if (blockBuilderStatus != null) { @@ -218,11 +222,14 @@ private void updateDataSize() } @Override - public ArrayBlock build() + public Block build() { if (currentEntryOpened) { throw new IllegalStateException("Current entry must be closed before the block can be built"); } + if (!hasNonNullRow) { + return nullRle(positionCount); + } return createArrayBlockInternal(0, positionCount, hasNullValue ? valueIsNull : null, offsets, values.build()); } @@ -240,4 +247,45 @@ public String toString() sb.append('}'); return sb.toString(); } + + @Override + public Block copyPositions(int[] positions, int offset, int length) + { + checkArrayRange(positions, offset, length); + + if (!hasNonNullRow) { + return nullRle(length); + } + return super.copyPositions(positions, offset, length); + } + + @Override + public Block getRegion(int position, int length) + { + int positionCount = getPositionCount(); + checkValidRegion(positionCount, position, length); + + if (!hasNonNullRow) { + return nullRle(length); + } + return super.getRegion(position, length); + } + + @Override + public Block copyRegion(int position, int length) + { + int positionCount = getPositionCount(); + checkValidRegion(positionCount, position, length); + + if (!hasNonNullRow) { + return nullRle(length); + } + return super.copyRegion(position, length); + } + + private RunLengthEncodedBlock nullRle(int positionCount) + { + ArrayBlock nullValueBlock = createArrayBlockInternal(0, 1, new boolean[] {true}, new int[] {0, 0}, values.newBlockBuilderLike(null).build()); + return new RunLengthEncodedBlock(nullValueBlock, positionCount); + } } diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlock.java index 2fcab2bd3e3c..71db3ba0e661 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlock.java @@ -85,6 +85,23 @@ public VariableWidthBlock(int positionCount, Slice slice, int[] offsets, Optiona retainedSizeInBytes = INSTANCE_SIZE + slice.getRetainedSize() + sizeOf(valueIsNull) + sizeOf(offsets); } + /** + * Gets the raw {@link Slice} that keeps the actual data bytes. + */ + public Slice getRawSlice() + { + return slice; + } + + /** + * Gets the offset of the value at the {@code position} in the {@link Slice} returned by {@link #getRawSlice())}. + */ + public int getRawSliceOffset(int position) + { + checkReadablePosition(this, position); + return getPositionOffset(position); + } + @Override protected final int getPositionOffset(int position) { diff --git a/core/trino-spi/src/test/java/io/trino/spi/block/TestArrayBlockBuilder.java b/core/trino-spi/src/test/java/io/trino/spi/block/TestArrayBlockBuilder.java index 5d669ac76f4c..9b69753502b2 100644 --- a/core/trino-spi/src/test/java/io/trino/spi/block/TestArrayBlockBuilder.java +++ b/core/trino-spi/src/test/java/io/trino/spi/block/TestArrayBlockBuilder.java @@ -73,4 +73,36 @@ public void testConcurrentWriting() .isInstanceOf(IllegalStateException.class) .hasMessage("Expected current entry to be closed but was opened"); } + + @Test + public void testBuilderProducesNullRleForNullRows() + { + // empty block + assertIsNullRle(blockBuilder().build(), 0); + + // single null + assertIsNullRle(blockBuilder().appendNull().build(), 1); + + // multiple nulls + assertIsNullRle(blockBuilder().appendNull().appendNull().build(), 2); + + BlockBuilder blockBuilder = blockBuilder().appendNull().appendNull(); + assertIsNullRle(blockBuilder.copyPositions(new int[] {0}, 0, 1), 1); + assertIsNullRle(blockBuilder.getRegion(0, 1), 1); + assertIsNullRle(blockBuilder.copyRegion(0, 1), 1); + } + + private static BlockBuilder blockBuilder() + { + return new ArrayBlockBuilder(BIGINT, null, 10); + } + + private void assertIsNullRle(Block block, int expectedPositionCount) + { + assertEquals(block.getPositionCount(), expectedPositionCount); + assertEquals(block.getClass(), RunLengthEncodedBlock.class); + if (expectedPositionCount > 0) { + assertTrue(block.isNull(0)); + } + } } diff --git a/plugin/trino-thrift-api/src/main/java/io/trino/plugin/thrift/api/datatypes/TrinoThriftBigintArray.java b/plugin/trino-thrift-api/src/main/java/io/trino/plugin/thrift/api/datatypes/TrinoThriftBigintArray.java index 525b9c2f7173..56874d0fed87 100644 --- a/plugin/trino-thrift-api/src/main/java/io/trino/plugin/thrift/api/datatypes/TrinoThriftBigintArray.java +++ b/plugin/trino-thrift-api/src/main/java/io/trino/plugin/thrift/api/datatypes/TrinoThriftBigintArray.java @@ -21,6 +21,7 @@ import io.trino.spi.block.ArrayBlock; import io.trino.spi.block.Block; import io.trino.spi.block.LongArrayBlock; +import io.trino.spi.block.RunLengthEncodedBlock; import io.trino.spi.type.Type; import javax.annotation.Nullable; @@ -144,12 +145,18 @@ public String toString() public static TrinoThriftBlock fromBlock(Block block) { - checkArgument(block instanceof AbstractArrayBlock, "block is not of an array type"); - AbstractArrayBlock arrayBlock = (AbstractArrayBlock) block; - int positions = arrayBlock.getPositionCount(); + int positions = block.getPositionCount(); if (positions == 0) { return bigintArrayData(new TrinoThriftBigintArray(null, null, null)); } + if (block instanceof RunLengthEncodedBlock && block.isNull(0)) { + boolean[] nulls = new boolean[positions]; + Arrays.fill(nulls, true); + return bigintArrayData(new TrinoThriftBigintArray(nulls, null, null)); + } + checkArgument(block instanceof AbstractArrayBlock, "block is not of an array type"); + AbstractArrayBlock arrayBlock = (AbstractArrayBlock) block; + boolean[] nulls = null; int[] sizes = null; for (int position = 0; position < positions; position++) {