Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ public Serializer(BlockEncodingSerde blockEncodingSerde)
public void serialize(Block block, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
throws IOException
{
// Encoding name is length prefixed as are many block encodings
SliceOutput output = new DynamicSliceOutput(toIntExact(block.getSizeInBytes() + block.getEncodingName().length() + (2 * Integer.BYTES)));
SliceOutput output = new DynamicSliceOutput(toIntExact(blockEncodingSerde.estimatedWriteSize(block)));
writeBlock(blockEncodingSerde, output, block);
Slice slice = output.slice();
jsonGenerator.writeBinary(Base64Variants.MIME_NO_LINEFEEDS, slice.byteArray(), slice.byteArrayOffset(), slice.length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.metadata;

import io.trino.spi.block.ArrayBlockEncoding;
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockEncoding;
import io.trino.spi.block.ByteArrayBlockEncoding;
import io.trino.spi.block.DictionaryBlockEncoding;
Expand All @@ -36,7 +37,10 @@

public final class BlockEncodingManager
{
private final Map<String, BlockEncoding> blockEncodings = new ConcurrentHashMap<>();
// for deserialization
private final Map<String, BlockEncoding> blockEncodingsByName = new ConcurrentHashMap<>();
// for serialization
private final Map<Class<? extends Block>, BlockEncoding> blockEncodingNamesByClass = new ConcurrentHashMap<>();

public BlockEncodingManager()
{
Expand All @@ -56,17 +60,26 @@ public BlockEncodingManager()
addBlockEncoding(new LazyBlockEncoding());
}

public BlockEncoding getBlockEncoding(String encodingName)
public BlockEncoding getBlockEncodingByName(String encodingName)
{
BlockEncoding blockEncoding = blockEncodings.get(encodingName);
BlockEncoding blockEncoding = blockEncodingsByName.get(encodingName);
checkArgument(blockEncoding != null, "Unknown block encoding: %s", encodingName);
return blockEncoding;
}

public BlockEncoding getBlockEncodingByBlockClass(Class<? extends Block> clazz)
{
BlockEncoding blockEncoding = blockEncodingNamesByClass.get(clazz);
checkArgument(blockEncoding != null, "Unknown block encoding for block: %s", clazz.getName());
return blockEncoding;
}

public void addBlockEncoding(BlockEncoding blockEncoding)
{
requireNonNull(blockEncoding, "blockEncoding is null");
BlockEncoding existingEntry = blockEncodings.putIfAbsent(blockEncoding.getName(), blockEncoding);
checkArgument(existingEntry == null, "Encoding already registered: %s", blockEncoding.getName());
BlockEncoding existingEntryByClass = blockEncodingNamesByClass.putIfAbsent(blockEncoding.getBlockClass(), blockEncoding);
checkArgument(existingEntryByClass == null, "Encoding already registered: %s", blockEncoding.getName());
BlockEncoding existingEntryByName = blockEncodingsByName.putIfAbsent(blockEncoding.getName(), blockEncoding);
checkArgument(existingEntryByName == null, "Encoding already registered: %s", blockEncoding.getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,28 @@
import java.util.Optional;
import java.util.function.Function;

import static io.airlift.slice.SizeOf.SIZE_OF_INT;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

public final class InternalBlockEncodingSerde
implements BlockEncodingSerde
{
private final Function<String, BlockEncoding> blockEncodings;
private final Function<String, BlockEncoding> nameToEncoding; // for deserialization
private final Function<Class<? extends Block>, BlockEncoding> blockToEncoding; // for serialization
private final Function<TypeId, Type> types;

@Inject
public InternalBlockEncodingSerde(BlockEncodingManager blockEncodingManager, TypeManager typeManager)
{
this(blockEncodingManager::getBlockEncoding, typeManager::getType);
this(blockEncodingManager::getBlockEncodingByName, blockEncodingManager::getBlockEncodingByBlockClass, typeManager::getType);
}

@VisibleForTesting
InternalBlockEncodingSerde(Function<String, BlockEncoding> blockEncodings, Function<TypeId, Type> types)
InternalBlockEncodingSerde(Function<String, BlockEncoding> nameToEncoding, Function<Class<? extends Block>, BlockEncoding> blockToEncoding, Function<TypeId, Type> types)
{
this.blockEncodings = requireNonNull(blockEncodings, "blockEncodings is null");
this.nameToEncoding = requireNonNull(nameToEncoding, "nameToEncoding is null");
this.blockToEncoding = requireNonNull(blockToEncoding, "blockToEncoding is null");
this.types = requireNonNull(types, "types is null");
}

Expand All @@ -56,7 +59,7 @@ public Block readBlock(SliceInput input)
String encodingName = readLengthPrefixedString(input);

// look up the encoding factory
BlockEncoding blockEncoding = blockEncodings.apply(encodingName);
BlockEncoding blockEncoding = nameToEncoding.apply(encodingName);

// load read the encoding factory from the output stream
return blockEncoding.readBlock(this, input);
Expand All @@ -66,11 +69,8 @@ public Block readBlock(SliceInput input)
public void writeBlock(SliceOutput output, Block block)
{
while (true) {
// get the encoding name
String encodingName = block.getEncodingName();

// look up the BlockEncoding
BlockEncoding blockEncoding = blockEncodings.apply(encodingName);
BlockEncoding blockEncoding = blockToEncoding.apply(block.getClass());

// see if a replacement block should be written instead
Optional<Block> replacementBlock = blockEncoding.replacementBlockForWrite(block);
Expand All @@ -80,7 +80,7 @@ public void writeBlock(SliceOutput output, Block block)
}

// write the name to the output
writeLengthPrefixedString(output, encodingName);
writeLengthPrefixedString(output, blockEncoding.getName());

// write the block to the output
blockEncoding.writeBlock(this, output, block);
Expand All @@ -89,6 +89,24 @@ public void writeBlock(SliceOutput output, Block block)
}
}

@Override
public long estimatedWriteSize(Block block)
{
while (true) {
BlockEncoding blockEncoding = blockToEncoding.apply(block.getClass());
// see if a replacement block should be written instead
Optional<Block> replacementBlock = blockEncoding.replacementBlockForWrite(block);
if (replacementBlock.isPresent()) {
block = replacementBlock.get();
continue;
}

// length of encoding name + encoding name + block size
// TODO: improve this estimate by adding estimatedWriteSize to BlockEncoding interface
return SIZE_OF_INT + blockEncoding.getName().length() + block.getSizeInBytes();
}
}

@Override
public Type readType(SliceInput sliceInput)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public Void visitConstant(ConstantExpression literal, Void context)
case Slice sliceValue -> hasher.putBytes(sliceValue.getBytes());
default -> {
Block block = literal.getBlockValue();
SliceOutput output = new DynamicSliceOutput(toIntExact(block.getSizeInBytes() + block.getEncodingName().length() + (2 * Integer.BYTES)));
SliceOutput output = new DynamicSliceOutput(toIntExact(blockEncodingSerde.estimatedWriteSize(block)));
blockEncodingSerde.writeBlock(output, block);
hasher.putBytes(output.slice().getBytes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.trino.spi.block.ByteArrayBlockBuilder;
import io.trino.spi.block.IntArrayBlockBuilder;
import io.trino.spi.block.LongArrayBlockBuilder;
import io.trino.spi.block.RunLengthBlockEncoding;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.block.ShortArrayBlockBuilder;
import io.trino.spi.block.VariableWidthBlock;
Expand Down Expand Up @@ -87,31 +86,27 @@ public void testBuildingFromLongArrayBlockBuilder()
{
LongArrayBlockBuilder blockBuilder = new LongArrayBlockBuilder(null, 100);
populateNullValues(blockBuilder, 100);
assertThat(blockBuilder.build().getEncodingName()).isEqualTo(RunLengthBlockEncoding.NAME);
}

@Test
public void testBuildingFromIntArrayBlockBuilder()
{
IntArrayBlockBuilder blockBuilder = new IntArrayBlockBuilder(null, 100);
populateNullValues(blockBuilder, 100);
assertThat(blockBuilder.build().getEncodingName()).isEqualTo(RunLengthBlockEncoding.NAME);
}

@Test
public void testBuildingFromShortArrayBlockBuilder()
{
ShortArrayBlockBuilder blockBuilder = new ShortArrayBlockBuilder(null, 100);
populateNullValues(blockBuilder, 100);
assertThat(blockBuilder.build().getEncodingName()).isEqualTo(RunLengthBlockEncoding.NAME);
}

@Test
public void testBuildingFromByteArrayBlockBuilder()
{
ByteArrayBlockBuilder blockBuilder = new ByteArrayBlockBuilder(null, 100);
populateNullValues(blockBuilder, 100);
assertThat(blockBuilder.build().getEncodingName()).isEqualTo(RunLengthBlockEncoding.NAME);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.BlockEncoding;
import io.trino.spi.block.BlockEncodingSerde;
import io.trino.spi.block.VariableWidthBlock;
import io.trino.spi.block.VariableWidthBlockEncoding;
import io.trino.spi.type.TestingTypeManager;
import io.trino.spi.type.Type;
Expand All @@ -35,7 +36,8 @@ public class TestInternalBlockEncodingSerde
{
private final TestingTypeManager testingTypeManager = new TestingTypeManager();
private final Map<String, BlockEncoding> blockEncodings = ImmutableMap.of(VariableWidthBlockEncoding.NAME, new VariableWidthBlockEncoding());
private final BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(blockEncodings::get, testingTypeManager::getType);
private final Map<Class<? extends Block>, BlockEncoding> blockNames = ImmutableMap.of(VariableWidthBlock.class, new VariableWidthBlockEncoding());
private final BlockEncodingSerde blockEncodingSerde = new InternalBlockEncodingSerde(blockEncodings::get, blockNames::get, testingTypeManager::getType);

@Test
public void blockRoundTrip()
Expand Down
Loading