Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ private void decompress()
blockSize,
sink.getSlice().byteArray(),
sink.getSlice().byteArrayOffset() + bytesPreserved,
sink.getSlice().length());
sink.getSlice().length() - bytesPreserved);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually code below:

System.arraycopy(
                        source.getSlice().byteArray(),
                        source.getSlice().byteArrayOffset() + source.getPosition(),
                        sink.getSlice().byteArray(),
                        sink.getSlice().byteArrayOffset() + bytesPreserved,
                        blockSize);

seems broken too. Do we know that blockSize will always fit in the targetSlice given some of the data is taken by bytesPreserved @arhimondr ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int bufferSize = blockSizeInBytes
                        // to guarantee a single long can always be read entirely
                        + Long.BYTES;
                buffers[0] = new ReadBuffer(Slices.allocate(bufferSize))

The buffer size is allocated according to blockSize with an extra 8 bytes (one long) for carry-overs.

The carry over is guaranteed to never be higher than 8 bytes as this is the most what can be requested to be read in one shot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
else {
System.arraycopy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@
import com.google.common.collect.ImmutableList;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
import io.trino.metadata.BlockEncodingManager;
import io.trino.metadata.InternalBlockEncodingSerde;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.BlockEncodingSerde;
import io.trino.spi.block.VariableWidthBlock;
import io.trino.spi.block.VariableWidthBlockBuilder;
import io.trino.spi.type.Type;
import io.trino.tpch.LineItem;
import io.trino.tpch.LineItemGenerator;
import org.assertj.core.api.Assertions;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -236,4 +242,112 @@ private int serializedSize(List<? extends Type> types, Page expectedPage)

return slice.length();
}

@Test
public void testDeserializationWithRollover()
{
// test non-zero rollover when refilling buffer on deserialization
for (int blockSize = 100; blockSize < 500; blockSize += 101) {
for (int numberOfEntries = 500; numberOfEntries < 1000; numberOfEntries += 99) {
testDeserializationWithRollover(blockSize, numberOfEntries);
}
}
}

private void testDeserializationWithRollover(int blockSize, int numberOfEntries)
{
testDeserializationWithRollover(false, false, numberOfEntries, blockSize);
testDeserializationWithRollover(false, true, numberOfEntries, blockSize);
testDeserializationWithRollover(true, false, numberOfEntries, blockSize);
testDeserializationWithRollover(true, true, numberOfEntries, blockSize);
}

private void testDeserializationWithRollover(boolean encryptionEnabled, boolean compressionEnabled, int numberOfEntries, int blockSize)
{
RolloverBlockSerde blockSerde = new RolloverBlockSerde();
Optional<SecretKey> encryptionKey = encryptionEnabled ? Optional.of(createRandomAesEncryptionKey()) : Optional.empty();
PageSerializer serializer = new PageSerializer(blockSerde, compressionEnabled, encryptionKey, blockSize);
PageDeserializer deserializer = new PageDeserializer(blockSerde, compressionEnabled, encryptionKey, blockSize);

Page page = createTestPage(numberOfEntries);
Slice serialized = serializer.serialize(page);
Page deserialized = deserializer.deserialize(serialized);
assertEquals(deserialized.getChannelCount(), 1);

VariableWidthBlock expected = (VariableWidthBlock) page.getBlock(0);
VariableWidthBlock actual = (VariableWidthBlock) deserialized.getBlock(0);

Assertions.assertThat(actual.getRawSlice().getBytes()).isEqualTo(expected.getRawSlice().getBytes());
}

private static Page createTestPage(int numberOfEntries)
{
VariableWidthBlockBuilder blockBuilder = new VariableWidthBlockBuilder(null, 1, 1000);
blockBuilder.writeInt(numberOfEntries);
for (int i = 0; i < numberOfEntries; i++) {
blockBuilder.writeLong(i);
}
blockBuilder.closeEntry();
return new Page(blockBuilder.build());
}

private static class RolloverBlockSerde
implements BlockEncodingSerde
{
@Override
public Block readBlock(SliceInput input)
{
int numberOfEntries = input.readInt();
VariableWidthBlockBuilder blockBuilder = new VariableWidthBlockBuilder(null, 1, 1000);
blockBuilder.writeInt(numberOfEntries);
for (int i = 0; i < numberOfEntries; ++i) {
// read 8 bytes at a time
blockBuilder.writeLong(input.readLong());
}
blockBuilder.closeEntry();
return blockBuilder.build();
}

@Override
public void writeBlock(SliceOutput output, Block block)
{
int offset = 0;
int numberOfEntries = block.getInt(0, offset);
output.writeInt(numberOfEntries);
offset += 4;
for (int i = 0; i < numberOfEntries; ++i) {
long value = block.getLong(0, offset);
offset += 8;
long b7 = value >> 56 & 0xffL;
long b6 = value >> 48 & 0xffL;
long b5 = value >> 40 & 0xffL;
long b4 = value >> 32 & 0xffL;
long b3 = value >> 24 & 0xffL;
long b2 = value >> 16 & 0xffL;
long b1 = value >> 8 & 0xffL;
long b0 = value & 0xffL;
// write one byte at a time
output.writeByte((int) b0);
output.writeByte((int) b1);
output.writeByte((int) b2);
output.writeByte((int) b3);
output.writeByte((int) b4);
output.writeByte((int) b5);
output.writeByte((int) b6);
output.writeByte((int) b7);
}
}

@Override
public Type readType(SliceInput sliceInput)
{
throw new RuntimeException("not implemented");
}

@Override
public void writeType(SliceOutput sliceOutput, Type type)
{
throw new RuntimeException("not implemented");
}
}
}