diff --git a/core/trino-main/src/main/java/io/trino/block/BlockJsonSerde.java b/core/trino-main/src/main/java/io/trino/block/BlockJsonSerde.java index 6917b36ac61f..8ebbbc7fe746 100644 --- a/core/trino-main/src/main/java/io/trino/block/BlockJsonSerde.java +++ b/core/trino-main/src/main/java/io/trino/block/BlockJsonSerde.java @@ -54,8 +54,7 @@ public Serializer(BlockEncodingSerde blockEncodingSerde) public void serialize(Block block, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException { - // Encoding name is length prefixed as are many block encodings - SliceOutput output = new DynamicSliceOutput(toIntExact(block.getSizeInBytes() + block.getEncodingName().length() + (2 * Integer.BYTES))); + SliceOutput output = new DynamicSliceOutput(toIntExact(blockEncodingSerde.estimatedWriteSize(block))); writeBlock(blockEncodingSerde, output, block); Slice slice = output.slice(); jsonGenerator.writeBinary(Base64Variants.MIME_NO_LINEFEEDS, slice.byteArray(), slice.byteArrayOffset(), slice.length()); diff --git a/core/trino-main/src/main/java/io/trino/metadata/BlockEncodingManager.java b/core/trino-main/src/main/java/io/trino/metadata/BlockEncodingManager.java index 3adb72ce07d7..97a02dfb738a 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/BlockEncodingManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/BlockEncodingManager.java @@ -14,6 +14,7 @@ package io.trino.metadata; import io.trino.spi.block.ArrayBlockEncoding; +import io.trino.spi.block.Block; import io.trino.spi.block.BlockEncoding; import io.trino.spi.block.ByteArrayBlockEncoding; import io.trino.spi.block.DictionaryBlockEncoding; @@ -36,7 +37,10 @@ public final class BlockEncodingManager { - private final Map blockEncodings = new ConcurrentHashMap<>(); + // for deserialization + private final Map blockEncodingsByName = new ConcurrentHashMap<>(); + // for serialization + private final Map, BlockEncoding> blockEncodingNamesByClass = new ConcurrentHashMap<>(); public BlockEncodingManager() { @@ -56,17 +60,26 @@ public BlockEncodingManager() addBlockEncoding(new LazyBlockEncoding()); } - public BlockEncoding getBlockEncoding(String encodingName) + public BlockEncoding getBlockEncodingByName(String encodingName) { - BlockEncoding blockEncoding = blockEncodings.get(encodingName); + BlockEncoding blockEncoding = blockEncodingsByName.get(encodingName); checkArgument(blockEncoding != null, "Unknown block encoding: %s", encodingName); return blockEncoding; } + public BlockEncoding getBlockEncodingByBlockClass(Class clazz) + { + BlockEncoding blockEncoding = blockEncodingNamesByClass.get(clazz); + checkArgument(blockEncoding != null, "Unknown block encoding for block: %s", clazz.getName()); + return blockEncoding; + } + public void addBlockEncoding(BlockEncoding blockEncoding) { requireNonNull(blockEncoding, "blockEncoding is null"); - BlockEncoding existingEntry = blockEncodings.putIfAbsent(blockEncoding.getName(), blockEncoding); - checkArgument(existingEntry == null, "Encoding already registered: %s", blockEncoding.getName()); + BlockEncoding existingEntryByClass = blockEncodingNamesByClass.putIfAbsent(blockEncoding.getBlockClass(), blockEncoding); + checkArgument(existingEntryByClass == null, "Encoding already registered: %s", blockEncoding.getName()); + BlockEncoding existingEntryByName = blockEncodingsByName.putIfAbsent(blockEncoding.getName(), blockEncoding); + checkArgument(existingEntryByName == null, "Encoding already registered: %s", blockEncoding.getName()); } } diff --git a/core/trino-main/src/main/java/io/trino/metadata/InternalBlockEncodingSerde.java b/core/trino-main/src/main/java/io/trino/metadata/InternalBlockEncodingSerde.java index c122b9e880f4..0acaee166c64 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/InternalBlockEncodingSerde.java +++ b/core/trino-main/src/main/java/io/trino/metadata/InternalBlockEncodingSerde.java @@ -27,25 +27,28 @@ import java.util.Optional; import java.util.function.Function; +import static io.airlift.slice.SizeOf.SIZE_OF_INT; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; public final class InternalBlockEncodingSerde implements BlockEncodingSerde { - private final Function blockEncodings; + private final Function nameToEncoding; // for deserialization + private final Function, BlockEncoding> blockToEncoding; // for serialization private final Function types; @Inject public InternalBlockEncodingSerde(BlockEncodingManager blockEncodingManager, TypeManager typeManager) { - this(blockEncodingManager::getBlockEncoding, typeManager::getType); + this(blockEncodingManager::getBlockEncodingByName, blockEncodingManager::getBlockEncodingByBlockClass, typeManager::getType); } @VisibleForTesting - InternalBlockEncodingSerde(Function blockEncodings, Function types) + InternalBlockEncodingSerde(Function nameToEncoding, Function, BlockEncoding> blockToEncoding, Function types) { - this.blockEncodings = requireNonNull(blockEncodings, "blockEncodings is null"); + this.nameToEncoding = requireNonNull(nameToEncoding, "nameToEncoding is null"); + this.blockToEncoding = requireNonNull(blockToEncoding, "blockToEncoding is null"); this.types = requireNonNull(types, "types is null"); } @@ -56,7 +59,7 @@ public Block readBlock(SliceInput input) String encodingName = readLengthPrefixedString(input); // look up the encoding factory - BlockEncoding blockEncoding = blockEncodings.apply(encodingName); + BlockEncoding blockEncoding = nameToEncoding.apply(encodingName); // load read the encoding factory from the output stream return blockEncoding.readBlock(this, input); @@ -66,11 +69,8 @@ public Block readBlock(SliceInput input) public void writeBlock(SliceOutput output, Block block) { while (true) { - // get the encoding name - String encodingName = block.getEncodingName(); - // look up the BlockEncoding - BlockEncoding blockEncoding = blockEncodings.apply(encodingName); + BlockEncoding blockEncoding = blockToEncoding.apply(block.getClass()); // see if a replacement block should be written instead Optional replacementBlock = blockEncoding.replacementBlockForWrite(block); @@ -80,7 +80,7 @@ public void writeBlock(SliceOutput output, Block block) } // write the name to the output - writeLengthPrefixedString(output, encodingName); + writeLengthPrefixedString(output, blockEncoding.getName()); // write the block to the output blockEncoding.writeBlock(this, output, block); @@ -89,6 +89,24 @@ public void writeBlock(SliceOutput output, Block block) } } + @Override + public long estimatedWriteSize(Block block) + { + while (true) { + BlockEncoding blockEncoding = blockToEncoding.apply(block.getClass()); + // see if a replacement block should be written instead + Optional replacementBlock = blockEncoding.replacementBlockForWrite(block); + if (replacementBlock.isPresent()) { + block = replacementBlock.get(); + continue; + } + + // length of encoding name + encoding name + block size + // TODO: improve this estimate by adding estimatedWriteSize to BlockEncoding interface + return SIZE_OF_INT + blockEncoding.getName().length() + block.getSizeInBytes(); + } + } + @Override public Type readType(SliceInput sliceInput) { diff --git a/core/trino-main/src/main/java/io/trino/sql/routine/SqlRoutineHash.java b/core/trino-main/src/main/java/io/trino/sql/routine/SqlRoutineHash.java index d85c778a6619..ba1c27b8ace1 100644 --- a/core/trino-main/src/main/java/io/trino/sql/routine/SqlRoutineHash.java +++ b/core/trino-main/src/main/java/io/trino/sql/routine/SqlRoutineHash.java @@ -234,7 +234,7 @@ public Void visitConstant(ConstantExpression literal, Void context) case Slice sliceValue -> hasher.putBytes(sliceValue.getBytes()); default -> { Block block = literal.getBlockValue(); - SliceOutput output = new DynamicSliceOutput(toIntExact(block.getSizeInBytes() + block.getEncodingName().length() + (2 * Integer.BYTES))); + SliceOutput output = new DynamicSliceOutput(toIntExact(blockEncodingSerde.estimatedWriteSize(block))); blockEncodingSerde.writeBlock(output, block); hasher.putBytes(output.slice().getBytes()); } diff --git a/core/trino-main/src/test/java/io/trino/block/TestRunLengthEncodedBlock.java b/core/trino-main/src/test/java/io/trino/block/TestRunLengthEncodedBlock.java index d8062387f643..3d7d525d935e 100644 --- a/core/trino-main/src/test/java/io/trino/block/TestRunLengthEncodedBlock.java +++ b/core/trino-main/src/test/java/io/trino/block/TestRunLengthEncodedBlock.java @@ -19,7 +19,6 @@ import io.trino.spi.block.ByteArrayBlockBuilder; import io.trino.spi.block.IntArrayBlockBuilder; import io.trino.spi.block.LongArrayBlockBuilder; -import io.trino.spi.block.RunLengthBlockEncoding; import io.trino.spi.block.RunLengthEncodedBlock; import io.trino.spi.block.ShortArrayBlockBuilder; import io.trino.spi.block.VariableWidthBlock; @@ -87,7 +86,6 @@ public void testBuildingFromLongArrayBlockBuilder() { LongArrayBlockBuilder blockBuilder = new LongArrayBlockBuilder(null, 100); populateNullValues(blockBuilder, 100); - assertThat(blockBuilder.build().getEncodingName()).isEqualTo(RunLengthBlockEncoding.NAME); } @Test @@ -95,7 +93,6 @@ public void testBuildingFromIntArrayBlockBuilder() { IntArrayBlockBuilder blockBuilder = new IntArrayBlockBuilder(null, 100); populateNullValues(blockBuilder, 100); - assertThat(blockBuilder.build().getEncodingName()).isEqualTo(RunLengthBlockEncoding.NAME); } @Test @@ -103,7 +100,6 @@ public void testBuildingFromShortArrayBlockBuilder() { ShortArrayBlockBuilder blockBuilder = new ShortArrayBlockBuilder(null, 100); populateNullValues(blockBuilder, 100); - assertThat(blockBuilder.build().getEncodingName()).isEqualTo(RunLengthBlockEncoding.NAME); } @Test @@ -111,7 +107,6 @@ public void testBuildingFromByteArrayBlockBuilder() { ByteArrayBlockBuilder blockBuilder = new ByteArrayBlockBuilder(null, 100); populateNullValues(blockBuilder, 100); - assertThat(blockBuilder.build().getEncodingName()).isEqualTo(RunLengthBlockEncoding.NAME); } @Test diff --git a/core/trino-main/src/test/java/io/trino/metadata/TestInternalBlockEncodingSerde.java b/core/trino-main/src/test/java/io/trino/metadata/TestInternalBlockEncodingSerde.java index 5772d4e7312b..72b652df3954 100644 --- a/core/trino-main/src/test/java/io/trino/metadata/TestInternalBlockEncodingSerde.java +++ b/core/trino-main/src/test/java/io/trino/metadata/TestInternalBlockEncodingSerde.java @@ -20,6 +20,7 @@ import io.trino.spi.block.BlockBuilder; import io.trino.spi.block.BlockEncoding; import io.trino.spi.block.BlockEncodingSerde; +import io.trino.spi.block.VariableWidthBlock; import io.trino.spi.block.VariableWidthBlockEncoding; import io.trino.spi.type.TestingTypeManager; import io.trino.spi.type.Type; @@ -35,7 +36,8 @@ public class TestInternalBlockEncodingSerde { private final TestingTypeManager testingTypeManager = new TestingTypeManager(); private final Map blockEncodings = ImmutableMap.of(VariableWidthBlockEncoding.NAME, new VariableWidthBlockEncoding()); - private final BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(blockEncodings::get, testingTypeManager::getType); + private final Map, BlockEncoding> blockNames = ImmutableMap.of(VariableWidthBlock.class, new VariableWidthBlockEncoding()); + private final BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(blockEncodings::get, blockNames::get, testingTypeManager::getType); @Test public void blockRoundTrip() diff --git a/core/trino-spi/pom.xml b/core/trino-spi/pom.xml index 09f158f24265..e8a64bac6a9a 100644 --- a/core/trino-spi/pom.xml +++ b/core/trino-spi/pom.xml @@ -221,64 +221,92 @@ true java.method.removed - method java.lang.Iterable<io.trino.spi.eventlistener.EventListener> io.trino.spi.connector.Connector::getEventListeners() - Remove connector event listeners + method java.lang.String io.trino.spi.block.ArrayBlock::getEncodingName() + Removal of bidrectional coupling of Blocks and BlockEncodings true - java.annotation.removed - method byte[] io.trino.spi.type.SqlVarbinary::getBytes() - method byte[] io.trino.spi.type.SqlVarbinary::getBytes() - @com.fasterxml.jackson.annotation.JsonValue - On-the-wire representation shouldn't rely on the Jackson format + java.method.removed + method java.lang.String io.trino.spi.block.Block::getEncodingName() + Removal of bidrectional coupling of Blocks and BlockEncodings + + + true + java.method.addedToInterface + method java.lang.Class<? extends io.trino.spi.block.Block> io.trino.spi.block.BlockEncoding::getBlockClass() + Removal of bidrectional coupling of Blocks and BlockEncodings + + + true + java.method.removed + method java.lang.String io.trino.spi.block.ByteArrayBlock::getEncodingName() + Removal of bidrectional coupling of Blocks and BlockEncodings + + + true + java.method.removed + method java.lang.String io.trino.spi.block.DictionaryBlock::getEncodingName() + Removal of bidrectional coupling of Blocks and BlockEncodings + + + true + java.method.removed + method java.lang.String io.trino.spi.block.Fixed12Block::getEncodingName() + Removal of bidrectional coupling of Blocks and BlockEncodings + + + true + java.method.removed + method java.lang.String io.trino.spi.block.Int128ArrayBlock::getEncodingName() + Removal of bidrectional coupling of Blocks and BlockEncodings true - java.annotation.removed - method java.lang.String io.trino.spi.type.SqlDate::toString() - method java.lang.String io.trino.spi.type.SqlDate::toString() - @com.fasterxml.jackson.annotation.JsonValue - On-the-wire representation shouldn't rely on the Jackson format + java.method.removed + method java.lang.String io.trino.spi.block.IntArrayBlock::getEncodingName() + Removal of bidrectional coupling of Blocks and BlockEncodings true - java.annotation.removed - method java.lang.String io.trino.spi.type.SqlDecimal::toString() - method java.lang.String io.trino.spi.type.SqlDecimal::toString() - @com.fasterxml.jackson.annotation.JsonValue - On-the-wire representation shouldn't rely on the Jackson format + java.method.removed + method java.lang.String io.trino.spi.block.LazyBlock::getEncodingName() + Removal of bidrectional coupling of Blocks and BlockEncodings true - java.annotation.removed - method java.lang.String io.trino.spi.type.SqlTime::toString() - method java.lang.String io.trino.spi.type.SqlTime::toString() - @com.fasterxml.jackson.annotation.JsonValue - On-the-wire representation shouldn't rely on the Jackson format + java.method.removed + method java.lang.String io.trino.spi.block.LongArrayBlock::getEncodingName() + Removal of bidrectional coupling of Blocks and BlockEncodings true - java.annotation.removed - method java.lang.String io.trino.spi.type.SqlTimeWithTimeZone::toString() - method java.lang.String io.trino.spi.type.SqlTimeWithTimeZone::toString() - @com.fasterxml.jackson.annotation.JsonValue - On-the-wire representation shouldn't rely on the Jackson format + java.method.removed + method java.lang.String io.trino.spi.block.MapBlock::getEncodingName() + Removal of bidrectional coupling of Blocks and BlockEncodings true - java.annotation.removed - method java.lang.String io.trino.spi.type.SqlTimestamp::toString() - method java.lang.String io.trino.spi.type.SqlTimestamp::toString() - @com.fasterxml.jackson.annotation.JsonValue - On-the-wire representation shouldn't rely on the Jackson format + java.method.removed + method java.lang.String io.trino.spi.block.RowBlock::getEncodingName() + Removal of bidrectional coupling of Blocks and BlockEncodings true - java.annotation.removed - method java.lang.String io.trino.spi.type.SqlTimestampWithTimeZone::toString() - method java.lang.String io.trino.spi.type.SqlTimestampWithTimeZone::toString() - @com.fasterxml.jackson.annotation.JsonValue - On-the-wire representation shouldn't rely on the Jackson format + java.method.removed + method java.lang.String io.trino.spi.block.RunLengthEncodedBlock::getEncodingName() + Removal of bidrectional coupling of Blocks and BlockEncodings + + + true + java.method.removed + method java.lang.String io.trino.spi.block.ShortArrayBlock::getEncodingName() + Removal of bidrectional coupling of Blocks and BlockEncodings + + + true + java.method.removed + method java.lang.String io.trino.spi.block.VariableWidthBlock::getEncodingName() + Removal of bidrectional coupling of Blocks and BlockEncodings diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/ArrayBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/ArrayBlock.java index a10f87c79e25..5e079cf5c402 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/ArrayBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/ArrayBlock.java @@ -192,12 +192,6 @@ int getOffsetBase() return arrayOffset; } - @Override - public String getEncodingName() - { - return ArrayBlockEncoding.NAME; - } - @Override public boolean mayHaveNull() { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/ArrayBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/ArrayBlockEncoding.java index ad01d4128132..2a1385319b54 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/ArrayBlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/ArrayBlockEncoding.java @@ -31,6 +31,12 @@ public String getName() return NAME; } + @Override + public Class getBlockClass() + { + return ArrayBlock.class; + } + @Override public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceOutput, Block block) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/Block.java b/core/trino-spi/src/main/java/io/trino/spi/block/Block.java index 63cc3afc7ab4..ea55eb882aac 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/Block.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/Block.java @@ -94,11 +94,6 @@ public sealed interface Block */ void retainedBytesForEachPart(ObjLongConsumer consumer); - /** - * Get the encoding for this block. - */ - String getEncodingName(); - /** * Create a new block from the current block by keeping the same elements only with respect * to {@code positions} that starts at {@code offset} and has length of {@code length}. The diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/BlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/BlockEncoding.java index 8b199fcd9678..6e2dfb7910c2 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/BlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/BlockEncoding.java @@ -25,6 +25,8 @@ public interface BlockEncoding */ String getName(); + Class getBlockClass(); + /** * Read a block from the specified input. The returned * block should begin at the specified position. diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/BlockEncodingSerde.java b/core/trino-spi/src/main/java/io/trino/spi/block/BlockEncodingSerde.java index 9111d8eaedba..5cd60dc46ccb 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/BlockEncodingSerde.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/BlockEncodingSerde.java @@ -29,6 +29,14 @@ public interface BlockEncodingSerde */ void writeBlock(SliceOutput output, Block block); + /** + * Estimate the size of the block when serialized to the on-the-wire representation. + */ + default long estimatedWriteSize(Block block) + { + return block.getSizeInBytes(); + } + /** * Reads a type from the input. */ diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/ByteArrayBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/ByteArrayBlock.java index 208a5265961a..e7dba0d9277c 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/ByteArrayBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/ByteArrayBlock.java @@ -215,12 +215,6 @@ public ByteArrayBlock copyRegion(int positionOffset, int length) return new ByteArrayBlock(0, length, newValueIsNull, newValues); } - @Override - public String getEncodingName() - { - return ByteArrayBlockEncoding.NAME; - } - @Override public ByteArrayBlock copyWithAppendedNull() { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/ByteArrayBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/ByteArrayBlockEncoding.java index 17f346f4e440..b7af69105119 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/ByteArrayBlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/ByteArrayBlockEncoding.java @@ -33,6 +33,12 @@ public String getName() return NAME; } + @Override + public Class getBlockClass() + { + return ByteArrayBlock.class; + } + @Override public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceOutput, Block block) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlock.java index 61425cdb21a6..e5f2a09402c6 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlock.java @@ -278,12 +278,6 @@ public void retainedBytesForEachPart(ObjLongConsumer consumer) consumer.accept(this, INSTANCE_SIZE); } - @Override - public String getEncodingName() - { - return DictionaryBlockEncoding.NAME; - } - @Override public Block copyPositions(int[] positions, int offset, int length) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlockEncoding.java index 75b0b6818d6a..5880f46eed1a 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/DictionaryBlockEncoding.java @@ -29,6 +29,12 @@ public String getName() return NAME; } + @Override + public Class getBlockClass() + { + return DictionaryBlock.class; + } + @Override public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceOutput, Block block) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/Fixed12Block.java b/core/trino-spi/src/main/java/io/trino/spi/block/Fixed12Block.java index 6f329259b14b..7ddbde46838c 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/Fixed12Block.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/Fixed12Block.java @@ -223,12 +223,6 @@ public Fixed12Block copyRegion(int positionOffset, int length) return new Fixed12Block(0, length, newValueIsNull, newValues); } - @Override - public String getEncodingName() - { - return Fixed12BlockEncoding.NAME; - } - @Override public Fixed12Block copyWithAppendedNull() { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/Fixed12BlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/Fixed12BlockEncoding.java index 5d1799f83c4a..d1a14c20458c 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/Fixed12BlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/Fixed12BlockEncoding.java @@ -30,6 +30,12 @@ public String getName() return NAME; } + @Override + public Class getBlockClass() + { + return Fixed12Block.class; + } + @Override public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceOutput, Block block) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/Int128ArrayBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/Int128ArrayBlock.java index 6be83a5aea69..990b4449ab27 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/Int128ArrayBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/Int128ArrayBlock.java @@ -215,12 +215,6 @@ public Int128ArrayBlock copyRegion(int positionOffset, int length) return new Int128ArrayBlock(0, length, newValueIsNull, newValues); } - @Override - public String getEncodingName() - { - return Int128ArrayBlockEncoding.NAME; - } - @Override public Int128ArrayBlock copyWithAppendedNull() { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/Int128ArrayBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/Int128ArrayBlockEncoding.java index 55539440d8cd..4fda6eb3a16d 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/Int128ArrayBlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/Int128ArrayBlockEncoding.java @@ -30,6 +30,12 @@ public String getName() return NAME; } + @Override + public Class getBlockClass() + { + return Int128ArrayBlock.class; + } + @Override public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceOutput, Block block) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/IntArrayBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/IntArrayBlock.java index 77c84b2b76b5..5712e0d4d6c2 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/IntArrayBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/IntArrayBlock.java @@ -198,12 +198,6 @@ public IntArrayBlock copyRegion(int positionOffset, int length) return new IntArrayBlock(0, length, newValueIsNull, newValues); } - @Override - public String getEncodingName() - { - return IntArrayBlockEncoding.NAME; - } - @Override public IntArrayBlock copyWithAppendedNull() { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/IntArrayBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/IntArrayBlockEncoding.java index 408475020e9a..383e8fc0f666 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/IntArrayBlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/IntArrayBlockEncoding.java @@ -32,6 +32,12 @@ public String getName() return NAME; } + @Override + public Class getBlockClass() + { + return IntArrayBlock.class; + } + @Override public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceOutput, Block block) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/LazyBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/LazyBlock.java index 0c5bd1a5ce8b..a0159b064a5c 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/LazyBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/LazyBlock.java @@ -113,12 +113,6 @@ public void retainedBytesForEachPart(ObjLongConsumer consumer) consumer.accept(this, INSTANCE_SIZE); } - @Override - public String getEncodingName() - { - return LazyBlockEncoding.NAME; - } - @Override public Block copyWithAppendedNull() { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/LazyBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/LazyBlockEncoding.java index 99a3df02b65a..698e0641a083 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/LazyBlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/LazyBlockEncoding.java @@ -29,6 +29,12 @@ public String getName() return NAME; } + @Override + public Class getBlockClass() + { + return LazyBlock.class; + } + @Override public Block readBlock(BlockEncodingSerde blockEncodingSerde, SliceInput input) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/LongArrayBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/LongArrayBlock.java index 12c57837a2c3..a181ace9ba5e 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/LongArrayBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/LongArrayBlock.java @@ -197,12 +197,6 @@ public LongArrayBlock copyRegion(int positionOffset, int length) return new LongArrayBlock(0, length, newValueIsNull, newValues); } - @Override - public String getEncodingName() - { - return LongArrayBlockEncoding.NAME; - } - @Override public LongArrayBlock copyWithAppendedNull() { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/LongArrayBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/LongArrayBlockEncoding.java index 5167fca68087..22c9a552fdc1 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/LongArrayBlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/LongArrayBlockEncoding.java @@ -32,6 +32,12 @@ public String getName() return NAME; } + @Override + public Class getBlockClass() + { + return LongArrayBlock.class; + } + @Override public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceOutput, Block block) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/MapBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/MapBlock.java index f95ce3f09bac..d9c5d6658eff 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/MapBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/MapBlock.java @@ -344,12 +344,6 @@ private int getOffset(int position) return offsets[position + startOffset]; } - @Override - public String getEncodingName() - { - return MapBlockEncoding.NAME; - } - @Override public MapBlock copyPositions(int[] positions, int offset, int length) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/MapBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/MapBlockEncoding.java index 2442219f1bac..38d4bccfd7c3 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/MapBlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/MapBlockEncoding.java @@ -36,6 +36,12 @@ public String getName() return NAME; } + @Override + public Class getBlockClass() + { + return MapBlock.class; + } + @Override public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceOutput, Block block) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/RowBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/RowBlock.java index b2b98eb6b3ce..bc7bab51c1e2 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/RowBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/RowBlock.java @@ -254,12 +254,6 @@ public RowBlock copyWithAppendedNull() return new RowBlock(positionCount + 1, newRowIsNull, newBlocks, fixedSizePerRow); } - @Override - public String getEncodingName() - { - return RowBlockEncoding.NAME; - } - @Override public RowBlock copyPositions(int[] positions, int offset, int length) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/RowBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/RowBlockEncoding.java index 32d29086402d..044793638a90 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/RowBlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/RowBlockEncoding.java @@ -30,6 +30,12 @@ public String getName() return NAME; } + @Override + public Class getBlockClass() + { + return RowBlock.class; + } + @Override public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceOutput, Block block) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/RunLengthBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/RunLengthBlockEncoding.java index b9cfc8acaf8f..6cfc282fc893 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/RunLengthBlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/RunLengthBlockEncoding.java @@ -27,6 +27,12 @@ public String getName() return NAME; } + @Override + public Class getBlockClass() + { + return RunLengthEncodedBlock.class; + } + @Override public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceOutput, Block block) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/RunLengthEncodedBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/RunLengthEncodedBlock.java index 22393bea9fbc..cd03ac2a46dc 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/RunLengthEncodedBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/RunLengthEncodedBlock.java @@ -137,12 +137,6 @@ public void retainedBytesForEachPart(ObjLongConsumer consumer) consumer.accept(this, INSTANCE_SIZE); } - @Override - public String getEncodingName() - { - return RunLengthBlockEncoding.NAME; - } - @Override public Block getPositions(int[] positions, int offset, int length) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/ShortArrayBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/ShortArrayBlock.java index 80977c3980c2..811572f931fc 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/ShortArrayBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/ShortArrayBlock.java @@ -197,12 +197,6 @@ public ShortArrayBlock copyRegion(int positionOffset, int length) return new ShortArrayBlock(0, length, newValueIsNull, newValues); } - @Override - public String getEncodingName() - { - return ShortArrayBlockEncoding.NAME; - } - @Override public ShortArrayBlock copyWithAppendedNull() { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/ShortArrayBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/ShortArrayBlockEncoding.java index 15813a428f74..36dc05b20d09 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/ShortArrayBlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/ShortArrayBlockEncoding.java @@ -32,6 +32,12 @@ public String getName() return NAME; } + @Override + public Class getBlockClass() + { + return ShortArrayBlock.class; + } + @Override public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceOutput, Block block) { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlock.java index c3dd96c72349..6cd81a848927 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlock.java @@ -281,12 +281,6 @@ public VariableWidthBlock copyRegion(int positionOffset, int length) return new VariableWidthBlock(0, length, newSlice, newOffsets, newValueIsNull); } - @Override - public String getEncodingName() - { - return VariableWidthBlockEncoding.NAME; - } - @Override public VariableWidthBlock copyWithAppendedNull() { diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlockEncoding.java b/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlockEncoding.java index 6e8af40a5b44..71ee1af5f43d 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlockEncoding.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/VariableWidthBlockEncoding.java @@ -35,6 +35,12 @@ public String getName() return NAME; } + @Override + public Class getBlockClass() + { + return VariableWidthBlock.class; + } + @Override public void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput sliceOutput, Block block) { diff --git a/core/trino-spi/src/test/java/io/trino/spi/block/TestingBlockEncodingSerde.java b/core/trino-spi/src/test/java/io/trino/spi/block/TestingBlockEncodingSerde.java index 9ab2e668091a..46698df39615 100644 --- a/core/trino-spi/src/test/java/io/trino/spi/block/TestingBlockEncodingSerde.java +++ b/core/trino-spi/src/test/java/io/trino/spi/block/TestingBlockEncodingSerde.java @@ -25,6 +25,7 @@ import java.util.function.Function; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.SIZE_OF_INT; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Objects.requireNonNull; @@ -33,7 +34,8 @@ public final class TestingBlockEncodingSerde implements BlockEncodingSerde { private final Function types; - private final ConcurrentMap blockEncodings = new ConcurrentHashMap<>(); + private final ConcurrentMap, BlockEncoding> blockEncodingsByClass = new ConcurrentHashMap<>(); + private final ConcurrentMap blockEncodingsByName = new ConcurrentHashMap<>(); public TestingBlockEncodingSerde() { @@ -61,7 +63,8 @@ public TestingBlockEncodingSerde(Function types) private void addBlockEncoding(BlockEncoding blockEncoding) { - blockEncodings.put(blockEncoding.getName(), blockEncoding); + blockEncodingsByClass.put(blockEncoding.getBlockClass(), blockEncoding); + blockEncodingsByName.put(blockEncoding.getName(), blockEncoding); } @Override @@ -71,7 +74,7 @@ public Block readBlock(SliceInput input) String encodingName = readLengthPrefixedString(input); // look up the encoding factory - BlockEncoding blockEncoding = blockEncodings.get(encodingName); + BlockEncoding blockEncoding = blockEncodingsByName.get(encodingName); checkArgument(blockEncoding != null, "Unknown block encoding %s", encodingName); // load read the encoding factory from the output stream @@ -82,11 +85,8 @@ public Block readBlock(SliceInput input) public void writeBlock(SliceOutput output, Block block) { while (true) { - // get the encoding name - String encodingName = block.getEncodingName(); - // look up the encoding factory - BlockEncoding blockEncoding = blockEncodings.get(encodingName); + BlockEncoding blockEncoding = blockEncodingsByClass.get(block.getClass()); // see if a replacement block should be written instead Optional replacementBlock = blockEncoding.replacementBlockForWrite(block); @@ -96,7 +96,7 @@ public void writeBlock(SliceOutput output, Block block) } // write the name to the output - writeLengthPrefixedString(output, encodingName); + writeLengthPrefixedString(output, blockEncoding.getName()); // write the block to the output blockEncoding.writeBlock(this, output, block); @@ -105,6 +105,23 @@ public void writeBlock(SliceOutput output, Block block) } } + @Override + public long estimatedWriteSize(Block block) + { + while (true) { + BlockEncoding blockEncoding = blockEncodingsByClass.get(block.getClass()); + // see if a replacement block should be written instead + Optional replacementBlock = blockEncoding.replacementBlockForWrite(block); + if (replacementBlock.isPresent()) { + block = replacementBlock.get(); + continue; + } + // length of encoding name + encoding name + block size + // TODO: improve this estimate by adding estimatedWriteSize to BlockEncoding interface + return SIZE_OF_INT + blockEncoding.getName().length() + block.getSizeInBytes(); + } + } + @Override public Type readType(SliceInput sliceInput) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveBlockEncodingSerde.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveBlockEncodingSerde.java index b031c354bba7..821595efd5d3 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveBlockEncodingSerde.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveBlockEncodingSerde.java @@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentMap; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.SIZE_OF_INT; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateType.DATE; @@ -60,7 +61,9 @@ public final class HiveBlockEncodingSerde implements BlockEncodingSerde { private static final List TYPES = ImmutableList.of(BOOLEAN, BIGINT, DOUBLE, INTEGER, VARCHAR, VARBINARY, TIMESTAMP_MILLIS, TIMESTAMP_TZ_MILLIS, DATE, HYPER_LOG_LOG); - private final ConcurrentMap blockEncodings = new ConcurrentHashMap<>(); + + private final ConcurrentMap blockEncodingsByName = new ConcurrentHashMap<>(); + private final ConcurrentMap, BlockEncoding> blockEncodingsByClass = new ConcurrentHashMap<>(); @Inject public HiveBlockEncodingSerde() @@ -82,7 +85,8 @@ public HiveBlockEncodingSerde() private void addBlockEncoding(BlockEncoding blockEncoding) { - blockEncodings.put(blockEncoding.getName(), blockEncoding); + blockEncodingsByClass.put(blockEncoding.getBlockClass(), blockEncoding); + blockEncodingsByName.put(blockEncoding.getName(), blockEncoding); } @Override @@ -92,7 +96,7 @@ public Block readBlock(SliceInput input) String encodingName = readLengthPrefixedString(input); // look up the encoding factory - BlockEncoding blockEncoding = blockEncodings.get(encodingName); + BlockEncoding blockEncoding = blockEncodingsByName.get(encodingName); checkArgument(blockEncoding != null, "Unknown block encoding %s", encodingName); // load read the encoding factory from the output stream @@ -100,15 +104,29 @@ public Block readBlock(SliceInput input) } @Override - public void writeBlock(SliceOutput output, Block block) + public long estimatedWriteSize(Block block) { while (true) { - // get the encoding name - String encodingName = block.getEncodingName(); + BlockEncoding blockEncoding = blockEncodingsByClass.get(block.getClass()); + // see if a replacement block should be written instead + Optional replacementBlock = blockEncoding.replacementBlockForWrite(block); + if (replacementBlock.isPresent()) { + block = replacementBlock.get(); + continue; + } + // length of encoding name + encoding name + block size + // TODO: improve this estimate by adding estimatedWriteSize to BlockEncoding interface + return SIZE_OF_INT + blockEncoding.getName().length() + block.getSizeInBytes(); + } + } + @Override + public void writeBlock(SliceOutput output, Block block) + { + while (true) { // look up the encoding factory - BlockEncoding blockEncoding = blockEncodings.get(encodingName); - checkArgument(blockEncoding != null, "Cannot write block %s with encoding %s", block, encodingName); + BlockEncoding blockEncoding = blockEncodingsByClass.get(block.getClass()); + checkArgument(blockEncoding != null, "Cannot write block %s as no encoding was found", block); // see if a replacement block should be written instead Optional replacementBlock = blockEncoding.replacementBlockForWrite(block); @@ -118,7 +136,7 @@ public void writeBlock(SliceOutput output, Block block) } // write the name to the output - writeLengthPrefixedString(output, encodingName); + writeLengthPrefixedString(output, blockEncoding.getName()); // write the block to the output blockEncoding.writeBlock(this, output, block);