diff --git a/presto-base-jdbc/src/main/java/com/facebook/presto/plugin/jdbc/QueryBuilder.java b/presto-base-jdbc/src/main/java/com/facebook/presto/plugin/jdbc/QueryBuilder.java index 3913d3a8c0f10..1bdddded24283 100644 --- a/presto-base-jdbc/src/main/java/com/facebook/presto/plugin/jdbc/QueryBuilder.java +++ b/presto-base-jdbc/src/main/java/com/facebook/presto/plugin/jdbc/QueryBuilder.java @@ -159,12 +159,14 @@ else if (typeAndValue.getType().equals(TimeType.TIME)) { } else if (typeAndValue.getType().equals(TimeWithTimeZoneType.TIME_WITH_TIME_ZONE)) { statement.setTime(i + 1, new Time(unpackMillisUtc((long) typeAndValue.getValue()))); + //statement.setObject(i + 1, OffsetTime.of(typeAndValue.getValue())); } else if (typeAndValue.getType().equals(TimestampType.TIMESTAMP)) { statement.setTimestamp(i + 1, new Timestamp((long) typeAndValue.getValue())); } else if (typeAndValue.getType().equals(TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE)) { statement.setTimestamp(i + 1, new Timestamp(unpackMillisUtc((long) typeAndValue.getValue()))); + //statement.setObject(i + 1, ZonedDateTime.of()); } else if (typeAndValue.getType() instanceof VarcharType) { statement.setString(i + 1, ((Slice) typeAndValue.getValue()).toStringUtf8()); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java index 913beb31ac062..58a6c0f4df38e 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java @@ -22,9 +22,11 @@ import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.block.InterleavedBlockBuilder; import com.facebook.presto.spi.predicate.TupleDomain; import com.facebook.presto.spi.type.DecimalType; import com.facebook.presto.spi.type.Decimals; +import com.facebook.presto.spi.type.NamedTypeSignature; import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.TypeManager; import com.google.common.collect.ImmutableList; @@ -58,7 +60,9 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.math.BigInteger; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -168,7 +172,8 @@ public ParquetHiveRecordCursor( columns, useParquetColumnNames, predicatePushdownEnabled, - effectivePredicate); + effectivePredicate, + typeManager); } @Override @@ -319,7 +324,8 @@ private ParquetRecordReader createParquetRecordReader( List columns, boolean useParquetColumnNames, boolean predicatePushdownEnabled, - TupleDomain effectivePredicate) + TupleDomain effectivePredicate, + TypeManager typeManager) { ParquetDataSource dataSource = null; try { @@ -330,7 +336,7 @@ private ParquetRecordReader createParquetRecordReader( FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); - PrestoReadSupport readSupport = new PrestoReadSupport(useParquetColumnNames, columns, fileSchema); + PrestoReadSupport readSupport = new PrestoReadSupport(useParquetColumnNames, columns, fileSchema, typeManager); List fields = columns.stream() .filter(column -> column.getColumnType() == REGULAR) @@ -406,7 +412,7 @@ public final class PrestoReadSupport private final List columns; private final List converters; - public PrestoReadSupport(boolean useParquetColumnNames, List columns, MessageType messageType) + public PrestoReadSupport(boolean useParquetColumnNames, List columns, MessageType messageType, TypeManager typeManager) { this.columns = columns; this.useParquetColumnNames = useParquetColumnNames; @@ -429,7 +435,7 @@ public PrestoReadSupport(boolean useParquetColumnNames, List c } } else { - converters.add(new ParquetColumnConverter(createGroupConverter(types[i], parquetType.getName(), parquetType, i), i)); + converters.add(new ParquetColumnConverter(createGroupConverter(types[i], parquetType.getName(), parquetType, i, useParquetColumnNames, typeManager), i)); } } } @@ -682,43 +688,78 @@ private interface BlockConverter void afterValue(); } + private interface IndexedBlockConverter + extends BlockConverter + { + void setFieldIndex(int fieldIndex); + int getFieldIndex(); + Type getType(); + } + private abstract static class GroupedConverter extends GroupConverter - implements BlockConverter + implements IndexedBlockConverter { public abstract Block getBlock(); } - private static BlockConverter createConverter(Type prestoType, String columnName, parquet.schema.Type parquetType, int fieldIndex) + private static IndexedBlockConverter createConverter(Type prestoType, String columnName, parquet.schema.Type parquetType, int fieldIndex, boolean useNames, TypeManager typeManager) { if (parquetType.isPrimitive()) { if (parquetType.getOriginalType() == DECIMAL) { DecimalMetadata decimalMetadata = ((PrimitiveType) parquetType).getDecimalMetadata(); - return new ParquetDecimalConverter(createDecimalType(decimalMetadata.getPrecision(), decimalMetadata.getScale())); + return new ParquetDecimalConverter(createDecimalType(decimalMetadata.getPrecision(), decimalMetadata.getScale()), fieldIndex); } else { return new ParquetPrimitiveConverter(prestoType, fieldIndex); } } - return createGroupConverter(prestoType, columnName, parquetType, fieldIndex); + return createGroupConverter(prestoType, columnName, parquetType, fieldIndex, useNames, typeManager); } - private static GroupedConverter createGroupConverter(Type prestoType, String columnName, parquet.schema.Type parquetType, int fieldIndex) + private static GroupedConverter createGroupConverter(Type prestoType, String columnName, parquet.schema.Type parquetType, int fieldIndex, boolean useNames, TypeManager typeManager) { GroupType groupType = parquetType.asGroupType(); switch (prestoType.getTypeSignature().getBase()) { case ARRAY: - return new ParquetListConverter(prestoType, columnName, groupType, fieldIndex); + return new ParquetListConverter(prestoType, columnName, groupType, fieldIndex, useNames, typeManager); case MAP: - return new ParquetMapConverter(prestoType, columnName, groupType, fieldIndex); + return new ParquetMapConverter(prestoType, columnName, groupType, fieldIndex, useNames, typeManager); case ROW: - return new ParquetStructConverter(prestoType, columnName, groupType, fieldIndex); + return new ParquetStructConverter(prestoType, columnName, groupType, fieldIndex, useNames, typeManager); default: throw new IllegalArgumentException("Column " + columnName + " type " + parquetType.getOriginalType() + " not supported"); } } + private static class FieldInformation + { + private final Type type; + private final int index; + + private FieldInformation(int index, Type type) + { + this.index = index; + this.type = type; + } + + static FieldInformation of(int index, Type type) + { + return new FieldInformation(index, type); + } + + public Type getType() + { + return type; + } + + public int getIndex() + { + return index; + } + } + private static class ParquetStructConverter extends GroupedConverter { @@ -726,34 +767,106 @@ private static class ParquetStructConverter private static final int NULL_BUILDER_SIZE_IN_BYTES_THRESHOLD = 32768; private final Type rowType; - private final int fieldIndex; + private int fieldIndex; - private final List converters; + private final List converters; private BlockBuilder builder; private BlockBuilder nullBuilder; // used internally when builder is set to null private BlockBuilder currentEntryBuilder; - public ParquetStructConverter(Type prestoType, String columnName, GroupType entryType, int fieldIndex) + private int[] converterIndexes; + private List outOfOrderFieldTypes; + private InterleavedBlockBuilder outOfOrderBlockBuilder; // a tmp builder for fields out of order + + private List createConverters(String columnName, List prestoTypeParameters, List fieldTypes, TypeManager typeManager) + { + ImmutableList.Builder converters = ImmutableList.builder(); + for (int i = 0, n = prestoTypeParameters.size(); i < n; i++) { + parquet.schema.Type fieldType = fieldTypes.get(i); + converters.add(createConverter(prestoTypeParameters.get(i), columnName + "." + fieldType.getName(), fieldType, i, false, typeManager)); + } + return converters.build(); + } + + private List createConvertersByName(String columnName, Type prestoType, List fieldTypes, TypeManager typeManager) + { + int parquetFieldCount = fieldTypes.size(); + int prestoFieldCount = prestoType.getTypeSignature().getParameters().size(); + + Map fieldTypeMap = new HashMap<>(prestoFieldCount); + for (int i = 0; i < prestoFieldCount; i++) { + NamedTypeSignature namedTypeSignature = prestoType.getTypeSignature().getParameters().get(i).getNamedTypeSignature(); + fieldTypeMap.put(namedTypeSignature.getName().toLowerCase(), FieldInformation.of(i, typeManager.getType(namedTypeSignature.getTypeSignature()))); + } + + boolean outOfOrder = false; + ImmutableList.Builder builder = ImmutableList.builder(); + for (int i = 0, prevFieldIndex = -1; i < parquetFieldCount; i++) { + parquet.schema.Type fieldType = fieldTypes.get(i); + FieldInformation fieldInformation = fieldTypeMap.get(fieldType.getName().toLowerCase()); + if (fieldInformation == null) { + builder.add(fieldType.isPrimitive() ? ParquetDummyPrimitiveConverter.converter : new ParquetDummyGroupConverter(fieldType)); + } + else { + if (prevFieldIndex > fieldInformation.getIndex()) { + outOfOrder = true; + } + builder.add(createConverter(fieldInformation.getType(), columnName + "." + fieldType.getName(), fieldType, fieldInformation.getIndex(), true, typeManager)); + prevFieldIndex = fieldInformation.getIndex(); + } + } + // When reading parquet files, we need to pass converters and parquet types in parquet schema fields order. + // After getting the parquet values, we need to build presto blocks in metastore schema fields order. + List converters = builder.build(); + if (outOfOrder) { + ImmutableList.Builder types = ImmutableList.builder(); + converterIndexes = new int[parquetFieldCount]; + for (int i = 0, tmpIndex = 0; i < parquetFieldCount; i++) { + IndexedBlockConverter converter = converters.get(i); + converterIndexes[i] = converter.getFieldIndex(); + if (converterIndexes[i] < 0) { + continue; + } + converter.setFieldIndex(tmpIndex++); + types.add(converter.getType()); + } + outOfOrderFieldTypes = types.build(); + } + return converters; + } + + public ParquetStructConverter(Type prestoType, String columnName, GroupType entryType, int fieldIndex, boolean useNames, TypeManager typeManager) { checkArgument(ROW.equals(prestoType.getTypeSignature().getBase())); - List prestoTypeParameters = prestoType.getTypeParameters(); List fieldTypes = entryType.getFields(); - checkArgument( - prestoTypeParameters.size() == fieldTypes.size(), - "Schema mismatch, metastore schema for row column %s has %s fields but parquet schema has %s fields", - columnName, - prestoTypeParameters.size(), - fieldTypes.size()); + + if (useNames || fieldTypes.size() != prestoType.getTypeSignature().getParameters().size()) { + this.converters = createConvertersByName(columnName, prestoType, fieldTypes, typeManager); + } + else { + this.converters = createConverters(columnName, prestoType.getTypeParameters(), fieldTypes, typeManager); + } this.rowType = prestoType; this.fieldIndex = fieldIndex; + } + + @Override + public Type getType() + { + return rowType; + } + + @Override + public void setFieldIndex(int fieldIndex) + { + this.fieldIndex = fieldIndex; + } - ImmutableList.Builder converters = ImmutableList.builder(); - for (int i = 0; i < prestoTypeParameters.size(); i++) { - parquet.schema.Type fieldType = fieldTypes.get(i); - converters.add(createConverter(prestoTypeParameters.get(i), columnName + "." + fieldType.getName(), fieldType, i)); - } - this.converters = converters.build(); + @Override + public int getFieldIndex() + { + return fieldIndex; } @Override @@ -766,6 +879,9 @@ public Converter getConverter(int fieldIndex) public void beforeValue(BlockBuilder builder) { this.builder = builder; + if (outOfOrderFieldTypes != null) { + outOfOrderBlockBuilder = new InterleavedBlockBuilder(outOfOrderFieldTypes, new BlockBuilderStatus(), NULL_BUILDER_POSITIONS_THRESHOLD); + } } @Override @@ -775,15 +891,22 @@ public void start() if (nullBuilder == null || (nullBuilder.getPositionCount() >= NULL_BUILDER_POSITIONS_THRESHOLD && nullBuilder.getSizeInBytes() >= NULL_BUILDER_SIZE_IN_BYTES_THRESHOLD)) { nullBuilder = rowType.createBlockBuilder(new BlockBuilderStatus(), NULL_BUILDER_POSITIONS_THRESHOLD); } - currentEntryBuilder = nullBuilder.beginBlockEntry(); + currentEntryBuilder = nullBuilder; } else { while (builder.getPositionCount() < fieldIndex) { builder.appendNull(); } - currentEntryBuilder = builder.beginBlockEntry(); + currentEntryBuilder = builder; } - for (BlockConverter converter : converters) { + if (outOfOrderBlockBuilder != null) { + currentEntryBuilder = outOfOrderBlockBuilder; + } + else { + currentEntryBuilder = currentEntryBuilder.beginBlockEntry(); + } + + for (IndexedBlockConverter converter : converters) { converter.beforeValue(currentEntryBuilder); } } @@ -794,7 +917,32 @@ public void end() for (BlockConverter converter : converters) { converter.afterValue(); } - while (currentEntryBuilder.getPositionCount() < converters.size()) { + if (outOfOrderBlockBuilder != null) { + if (builder == null) { + currentEntryBuilder = nullBuilder; + } + else { + currentEntryBuilder = builder; + } + + currentEntryBuilder = currentEntryBuilder.beginBlockEntry(); + List indexedConverters = new ArrayList<>(converters); + indexedConverters.sort((IndexedBlockConverter converter1, IndexedBlockConverter converter2)->Integer.compare(converterIndexes[converter1.getFieldIndex()], converterIndexes[converter2.getFieldIndex()])); + int positionCount = outOfOrderBlockBuilder.getPositionCount(); + for (int i = 0; i < converterIndexes.length; i++) { + int position = indexedConverters.get(i).getFieldIndex(); + if (converterIndexes[position] < 0 || position >= positionCount) { + continue; + } + while (currentEntryBuilder.getPositionCount() < converterIndexes[position]) { + currentEntryBuilder.appendNull(); + } + outOfOrderBlockBuilder.writePositionTo(position, currentEntryBuilder); + currentEntryBuilder.closeEntry(); + } + } + int fieldCount = rowType.getTypeParameters().size(); + while (currentEntryBuilder.getPositionCount() < fieldCount) { currentEntryBuilder.appendNull(); } @@ -826,14 +974,14 @@ private static class ParquetListConverter private static final int NULL_BUILDER_SIZE_IN_BYTES_THRESHOLD = 32768; private final Type arrayType; - private final int fieldIndex; + private int fieldIndex; private final BlockConverter elementConverter; private BlockBuilder builder; private BlockBuilder nullBuilder; // used internally when builder is set to null private BlockBuilder currentEntryBuilder; - public ParquetListConverter(Type prestoType, String columnName, GroupType listType, int fieldIndex) + public ParquetListConverter(Type prestoType, String columnName, GroupType listType, int fieldIndex, boolean useNames, TypeManager typeManager) { checkArgument( listType.getFieldCount() == 1, @@ -859,10 +1007,10 @@ public ParquetListConverter(Type prestoType, String columnName, GroupType listTy // documentation at http://git.io/vOpNz. parquet.schema.Type elementType = listType.getType(0); if (isElementType(elementType, listType.getName())) { - elementConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".element", elementType, 0); + elementConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".element", elementType, 0, useNames, typeManager); } else { - elementConverter = new ParquetListEntryConverter(prestoType.getTypeParameters().get(0), columnName, elementType.asGroupType()); + elementConverter = new ParquetListEntryConverter(prestoType.getTypeParameters().get(0), columnName, elementType.asGroupType(), useNames, typeManager); } } @@ -888,6 +1036,24 @@ private boolean isElementType(parquet.schema.Type repeatedType, String parentNam return false; } + @Override + public Type getType() + { + return arrayType; + } + + @Override + public void setFieldIndex(int fieldIndex) + { + this.fieldIndex = fieldIndex; + } + + @Override + public int getFieldIndex() + { + return fieldIndex; + } + @Override public void beforeValue(BlockBuilder builder) { @@ -956,7 +1122,7 @@ private static class ParquetListEntryConverter private BlockBuilder builder; private int startingPosition; - public ParquetListEntryConverter(Type prestoType, String columnName, GroupType elementType) + public ParquetListEntryConverter(Type prestoType, String columnName, GroupType elementType, boolean useNames, TypeManager typeManager) { checkArgument( elementType.getOriginalType() == null, @@ -970,7 +1136,7 @@ public ParquetListEntryConverter(Type prestoType, String columnName, GroupType e columnName, elementType.getFieldCount()); - elementConverter = createConverter(prestoType, columnName + ".element", elementType.getType(0), 0); + elementConverter = createConverter(prestoType, columnName + ".element", elementType.getType(0), 0, useNames, typeManager); } @Override @@ -1018,14 +1184,14 @@ private static class ParquetMapConverter private static final int NULL_BUILDER_SIZE_IN_BYTES_THRESHOLD = 32768; private final Type mapType; - private final int fieldIndex; + private int fieldIndex; private final ParquetMapEntryConverter entryConverter; private BlockBuilder builder; private BlockBuilder nullBuilder; // used internally when builder is set to null private BlockBuilder currentEntryBuilder; - public ParquetMapConverter(Type type, String columnName, GroupType mapType, int fieldIndex) + public ParquetMapConverter(Type type, String columnName, GroupType mapType, int fieldIndex, boolean useNames, TypeManager typeManager) { checkArgument( mapType.getFieldCount() == 1, @@ -1038,7 +1204,25 @@ public ParquetMapConverter(Type type, String columnName, GroupType mapType, int parquet.schema.Type entryType = mapType.getFields().get(0); - entryConverter = new ParquetMapEntryConverter(type, columnName + ".entry", entryType.asGroupType()); + entryConverter = new ParquetMapEntryConverter(type, columnName + ".entry", entryType.asGroupType(), useNames, typeManager); + } + + @Override + public Type getType() + { + return mapType; + } + + @Override + public void setFieldIndex(int fieldIndex) + { + this.fieldIndex = fieldIndex; + } + + @Override + public int getFieldIndex() + { + return fieldIndex; } @Override @@ -1109,7 +1293,7 @@ private static class ParquetMapEntryConverter private BlockBuilder builder; - public ParquetMapEntryConverter(Type prestoType, String columnName, GroupType entryType) + public ParquetMapEntryConverter(Type prestoType, String columnName, GroupType entryType, boolean useNames, TypeManager typeManager) { checkArgument(MAP.equals(prestoType.getTypeSignature().getBase())); // original version of parquet used null for entry due to a bug @@ -1144,8 +1328,8 @@ public ParquetMapEntryConverter(Type prestoType, String columnName, GroupType en columnName, entryGroupType.getType(0)); - keyConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".key", entryGroupType.getFields().get(0), 0); - valueConverter = createConverter(prestoType.getTypeParameters().get(1), columnName + ".value", entryGroupType.getFields().get(1), 1); + keyConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".key", entryGroupType.getFields().get(0), 0, useNames, typeManager); + valueConverter = createConverter(prestoType.getTypeParameters().get(1), columnName + ".value", entryGroupType.getFields().get(1), 1, useNames, typeManager); } @Override @@ -1193,10 +1377,10 @@ public void afterValue() private static class ParquetPrimitiveConverter extends PrimitiveConverter - implements BlockConverter + implements IndexedBlockConverter { private final Type type; - private final int fieldIndex; + private int fieldIndex; private BlockBuilder builder; public ParquetPrimitiveConverter(Type type, int fieldIndex) @@ -1205,6 +1389,24 @@ public ParquetPrimitiveConverter(Type type, int fieldIndex) this.fieldIndex = fieldIndex; } + @Override + public Type getType() + { + return type; + } + + @Override + public void setFieldIndex(int fieldIndex) + { + this.fieldIndex = fieldIndex; + } + + @Override + public int getFieldIndex() + { + return fieldIndex; + } + @Override public void beforeValue(BlockBuilder builder) { @@ -1307,15 +1509,35 @@ public void addInt(int value) private static class ParquetDecimalConverter extends PrimitiveConverter - implements BlockConverter + implements IndexedBlockConverter { private final DecimalType decimalType; + private int fieldIndex; private BlockBuilder builder; private boolean wroteValue; - public ParquetDecimalConverter(DecimalType decimalType) + public ParquetDecimalConverter(DecimalType decimalType, int fieldIndex) { this.decimalType = requireNonNull(decimalType, "decimalType is null"); + this.fieldIndex = fieldIndex; + } + + @Override + public Type getType() + { + return decimalType; + } + + @Override + public void setFieldIndex(int fieldIndex) + { + this.fieldIndex = fieldIndex; + } + + @Override + public int getFieldIndex() + { + return fieldIndex; } @Override @@ -1366,4 +1588,156 @@ public void addBinary(Binary value) wroteValue = true; } } + + private static class ParquetDummyPrimitiveConverter + extends PrimitiveConverter + implements IndexedBlockConverter + { + static ParquetDummyPrimitiveConverter converter = new ParquetDummyPrimitiveConverter(); + + @Override + public Type getType() + { + return null; + } + + @Override + public void setFieldIndex(int fieldIndex) + { + } + + @Override + public int getFieldIndex() + { + return -1; + } + + @Override + public void beforeValue(BlockBuilder builder) + { + } + + @Override + public void afterValue() + { + } + + @Override + public boolean isPrimitive() + { + return true; + } + + @Override + public PrimitiveConverter asPrimitiveConverter() + { + return this; + } + + @Override + public boolean hasDictionarySupport() + { + return false; + } + + @Override + public void setDictionary(Dictionary dictionary) + { + } + + @Override + public void addValueFromDictionary(int dictionaryId) + { + } + + @Override + public void addBoolean(boolean value) + { + } + + @Override + public void addDouble(double value) + { + } + + @Override + public void addLong(long value) + { + } + + @Override + public void addBinary(Binary value) + { + } + + @Override + public void addFloat(float value) + { + } + + @Override + public void addInt(int value) + { + } + } + + private static class ParquetDummyGroupConverter + extends GroupConverter + implements IndexedBlockConverter + { + private final List converters; + + public ParquetDummyGroupConverter(parquet.schema.Type fieldType) + { + requireNonNull(fieldType, "fieldType is null"); + ImmutableList.Builder builder = ImmutableList.builder(); + for (parquet.schema.Type type : fieldType.asGroupType().getFields()) { + builder.add(type.isPrimitive() ? ParquetDummyPrimitiveConverter.converter : new ParquetDummyGroupConverter(type)); + } + converters = builder.build(); + } + + @Override + public void beforeValue(BlockBuilder builder) + { + } + + @Override + public void afterValue() + { + } + + @Override + public void setFieldIndex(int fieldIndex) + { + } + + @Override + public int getFieldIndex() + { + return -1; + } + + @Override + public Type getType() + { + return null; + } + + @Override + public void end() + { + } + + @Override + public Converter getConverter(int fieldIndex) + { + return converters.get(fieldIndex); + } + + @Override + public void start() + { + } + } } diff --git a/presto-hive/src/test/sql/create-test-hive13.sql b/presto-hive/src/test/sql/create-test-hive13.sql index 54747332d993b..9c1adb5dd51f7 100644 --- a/presto-hive/src/test/sql/create-test-hive13.sql +++ b/presto-hive/src/test/sql/create-test-hive13.sql @@ -128,7 +128,6 @@ SELECT FROM presto_test_types_textfile ; - ALTER TABLE presto_test_types_textfile ADD COLUMNS (new_column INT); ALTER TABLE presto_test_types_sequencefile ADD COLUMNS (new_column INT); ALTER TABLE presto_test_types_rctext ADD COLUMNS (new_column INT); diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/block/AbstractInterleavedBlock.java b/presto-spi/src/main/java/com/facebook/presto/spi/block/AbstractInterleavedBlock.java new file mode 100644 index 0000000000000..99a5145bbd362 --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/block/AbstractInterleavedBlock.java @@ -0,0 +1,308 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.block; + +import io.airlift.slice.Slice; + +import java.util.ArrayList; +import java.util.List; + +import static com.facebook.presto.spi.block.BlockUtil.checkArrayRange; + +public abstract class AbstractInterleavedBlock + implements Block +{ + private final int columns; + + protected abstract Block getBlock(int blockIndex); + + protected abstract int toAbsolutePosition(int position); + + @Override + public abstract InterleavedBlockEncoding getEncoding(); + + protected AbstractInterleavedBlock(int columns) + { + if (columns <= 0) { + throw new IllegalArgumentException("Number of blocks in InterleavedBlock must be positive"); + } + this.columns = columns; + } + + int getBlockCount() + { + return columns; + } + + Block[] computeSerializableSubBlocks() + { + InterleavedBlock interleavedBlock = (InterleavedBlock) sliceRange(0, getPositionCount(), false); + Block[] result = new Block[interleavedBlock.getBlockCount()]; + for (int i = 0; i < result.length; i++) { + result[i] = interleavedBlock.getBlock(i); + } + return result; + } + + /** + * Can only be called after the child class is initialized enough that getBlock will return the right value + */ + protected InterleavedBlockEncoding computeBlockEncoding() + { + BlockEncoding[] individualBlockEncodings = new BlockEncoding[columns]; + for (int i = 0; i < columns; i++) { + Block block = getBlock(i); + individualBlockEncodings[i] = block.getEncoding(); + } + return new InterleavedBlockEncoding(individualBlockEncodings); + } + + @Override + public void writePositionTo(int position, BlockBuilder blockBuilder) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + getBlock(blockIndex).writePositionTo(positionInBlock, blockBuilder); + } + + @Override + public byte getByte(int position, int offset) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + return getBlock(blockIndex).getByte(positionInBlock, offset); + } + + @Override + public short getShort(int position, int offset) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + return getBlock(blockIndex).getShort(positionInBlock, offset); + } + + @Override + public int getInt(int position, int offset) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + return getBlock(blockIndex).getInt(positionInBlock, offset); + } + + @Override + public long getLong(int position, int offset) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + return getBlock(blockIndex).getLong(positionInBlock, offset); + } + + @Override + public Slice getSlice(int position, int offset, int length) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + return getBlock(blockIndex).getSlice(positionInBlock, offset, length); + } + + @Override + public T getObject(int position, Class clazz) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + return getBlock(blockIndex).getObject(positionInBlock, clazz); + } + + @Override + public int getSliceLength(int position) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + return getBlock(blockIndex).getSliceLength(positionInBlock); + } + + @Override + public boolean equals(int position, int offset, Block otherBlock, int otherPosition, int otherOffset, int length) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + return getBlock(blockIndex).equals(positionInBlock, offset, otherBlock, otherPosition, otherOffset, length); + } + + @Override + public boolean bytesEqual(int position, int offset, Slice otherSlice, int otherOffset, int length) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + return getBlock(blockIndex).bytesEqual(positionInBlock, offset, otherSlice, otherOffset, length); + } + + @Override + public long hash(int position, int offset, int length) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + return getBlock(blockIndex).hash(positionInBlock, offset, length); + } + + @Override + public int compareTo(int position, int offset, int length, Block otherBlock, int otherPosition, int otherOffset, int otherLength) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + return getBlock(blockIndex).compareTo(positionInBlock, offset, length, otherBlock, otherPosition, otherOffset, otherLength); + } + + @Override + public int bytesCompare(int position, int offset, int length, Slice otherSlice, int otherOffset, int otherLength) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + return getBlock(blockIndex).bytesCompare(positionInBlock, offset, length, otherSlice, otherOffset, otherLength); + } + + @Override + public void writeBytesTo(int position, int offset, int length, BlockBuilder blockBuilder) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + getBlock(blockIndex).writeBytesTo(positionInBlock, offset, length, blockBuilder); + } + + @Override + public Block getSingleValueBlock(int position) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + // return the underlying block directly, as it is unnecessary to wrap around it if there's only one block + return getBlock(blockIndex).getSingleValueBlock(positionInBlock); + } + + @Override + public Block copyPositions(int[] positions, int offset, int length) + { + checkArrayRange(positions, offset, length); + + int positionsPerColumn = length / columns; + + List> valuePositions = new ArrayList<>(columns); + for (int i = 0; i < columns; i++) { + valuePositions.add(new ArrayList<>(positionsPerColumn)); + } + int ordinal = 0; + for (int i = offset; i < offset + length; ++i) { + int position = positions[i]; + position = toAbsolutePosition(position); + if (ordinal % columns != position % columns) { + throw new IllegalArgumentException("Position (" + position + ") is not congruent to ordinal (" + ordinal + ") modulo columns (" + columns + ")"); + } + valuePositions.get(position % columns).add(position / columns); + ordinal++; + } + Block[] blocks = new Block[columns]; + for (int i = 0; i < columns; i++) { + blocks[i] = getBlock(i).copyPositions(valuePositions.get(i).stream().mapToInt(a -> a).toArray(), + 0, valuePositions.get(i).size()); + } + return new InterleavedBlock(blocks); + } + + @Override + public Block copyRegion(int position, int length) + { + validateRange(position, length); + return sliceRange(position, length, true); + } + + @Override + public long getRegionSizeInBytes(int position, int length) + { + if (position == 0 && length == getPositionCount()) { + // Calculation of getRegionSizeInBytes is expensive in this class. + // On the other hand, getSizeInBytes result is cached or pre-computed. + return getSizeInBytes(); + } + validateRange(position, length); + long result = 0; + for (int blockIndex = 0; blockIndex < getBlockCount(); blockIndex++) { + result += getBlock(blockIndex).getRegionSizeInBytes(position / columns, length / columns); + } + return result; + } + + protected void validateRange(int position, int length) + { + int positionCount = getPositionCount(); + if (position < 0 || length < 0 || position + length > positionCount || position % columns != 0 || length % columns != 0) { + throw new IndexOutOfBoundsException("Invalid position (" + position + "), length (" + length + ") in InterleavedBlock with " + positionCount + " positions and " + columns + " columns"); + } + } + + protected Block sliceRange(int position, int length, boolean compact) + { + position = toAbsolutePosition(position); + Block[] resultBlocks = new Block[columns]; + int positionInBlock = position / columns; + int subBlockLength = length / columns; + for (int blockIndex = 0; blockIndex < columns; blockIndex++) { + if (compact) { + resultBlocks[blockIndex] = getBlock((blockIndex + position) % columns).copyRegion(positionInBlock, subBlockLength); + } + else { + resultBlocks[blockIndex] = getBlock((blockIndex + position) % columns).getRegion(positionInBlock, subBlockLength); + } + } + return new InterleavedBlock(resultBlocks); + } + + @Override + public boolean isNull(int position) + { + position = toAbsolutePosition(position); + int blockIndex = position % columns; + int positionInBlock = position / columns; + + return getBlock(blockIndex).isNull(positionInBlock); + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/block/InterleavedBlock.java b/presto-spi/src/main/java/com/facebook/presto/spi/block/InterleavedBlock.java new file mode 100644 index 0000000000000..c021733b4a1fb --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/block/InterleavedBlock.java @@ -0,0 +1,142 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.block; + +import org.openjdk.jol.info.ClassLayout; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +public class InterleavedBlock + extends AbstractInterleavedBlock +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(InterleavedBlock.class).instanceSize(); + + private final Block[] blocks; + private final InterleavedBlockEncoding blockEncoding; + private final int start; + private final int positionCount; + private final long retainedSizeInBytes; + + private final AtomicLong sizeInBytes; + + public InterleavedBlock(Block[] blocks) + { + super(blocks.length); + this.blocks = blocks; + + long sizeInBytes = 0; + long retainedSizeInBytes = INSTANCE_SIZE; + int positionCount = 0; + int firstSubBlockPositionCount = blocks[0].getPositionCount(); + for (int i = 0; i < getBlockCount(); i++) { + sizeInBytes += blocks[i].getSizeInBytes(); + retainedSizeInBytes += blocks[i].getRetainedSizeInBytes(); + positionCount += blocks[i].getPositionCount(); + + if (firstSubBlockPositionCount != blocks[i].getPositionCount()) { + throw new IllegalArgumentException("length of sub blocks differ: block 0: " + firstSubBlockPositionCount + ", block " + i + ": " + blocks[i].getPositionCount()); + } + } + + this.blockEncoding = computeBlockEncoding(); + this.start = 0; + this.positionCount = positionCount; + this.sizeInBytes = new AtomicLong(sizeInBytes); + this.retainedSizeInBytes = retainedSizeInBytes; + } + + private InterleavedBlock(Block[] blocks, int start, int positionCount, long retainedSizeInBytes, InterleavedBlockEncoding blockEncoding) + { + super(blocks.length); + this.blocks = blocks; + this.start = start; + this.positionCount = positionCount; + this.retainedSizeInBytes = retainedSizeInBytes; + this.blockEncoding = blockEncoding; + this.sizeInBytes = new AtomicLong(-1); + } + + @Override + public Block getRegion(int position, int length) + { + validateRange(position, length); + return new InterleavedBlock(blocks, toAbsolutePosition(position), length, retainedSizeInBytes, blockEncoding); + } + + @Override + protected Block getBlock(int blockIndex) + { + if (blockIndex < 0) { + throw new IllegalArgumentException("position is not valid"); + } + + return blocks[blockIndex]; + } + + @Override + protected int toAbsolutePosition(int position) + { + return position + start; + } + + @Override + public InterleavedBlockEncoding getEncoding() + { + return blockEncoding; + } + + @Override + public int getPositionCount() + { + return positionCount; + } + + @Override + public long getSizeInBytes() + { + long sizeInBytes = this.sizeInBytes.get(); + if (sizeInBytes < 0) { + sizeInBytes = 0; + for (int i = 0; i < getBlockCount(); i++) { + sizeInBytes += blocks[i].getRegionSizeInBytes(start / blocks.length, positionCount / blocks.length); + } + this.sizeInBytes.set(sizeInBytes); + } + return sizeInBytes; + } + + @Override + public long getRetainedSizeInBytes() + { + return retainedSizeInBytes; + } + + @Override + public void retainedBytesForEachPart(BiConsumer consumer) + { + consumer.accept(blocks, retainedSizeInBytes - INSTANCE_SIZE); + consumer.accept(this, (long) INSTANCE_SIZE); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder("InterleavedBlock{"); + sb.append("columns=").append(getBlockCount()); + sb.append(", positionCountPerBlock=").append(getPositionCount() / getBlockCount()); + sb.append('}'); + return sb.toString(); + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/block/InterleavedBlockBuilder.java b/presto-spi/src/main/java/com/facebook/presto/spi/block/InterleavedBlockBuilder.java new file mode 100644 index 0000000000000..686db05f0543c --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/block/InterleavedBlockBuilder.java @@ -0,0 +1,280 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.block; + +import com.facebook.presto.spi.type.Type; +import io.airlift.slice.Slice; +import org.openjdk.jol.info.ClassLayout; + +import java.util.List; +import java.util.function.BiConsumer; + +import static java.util.Objects.requireNonNull; + +public class InterleavedBlockBuilder + extends AbstractInterleavedBlock + implements BlockBuilder +{ + // TODO: This does not account for the size of the blockEncoding field + private static final int INSTANCE_SIZE = ClassLayout.parseClass(InterleavedBlockBuilder.class).instanceSize(); + + private final BlockBuilder[] blockBuilders; + private final InterleavedBlockEncoding blockEncoding; + + private int positionCount; + private int currentBlockIndex; + private long sizeInBytes; + private long startSize; + private long retainedSizeInBytes; + private long startRetainedSize; + + public InterleavedBlockBuilder(List types, BlockBuilderStatus blockBuilderStatus, int expectedEntries) + { + this( + types.stream() + .map(t -> t.createBlockBuilder(blockBuilderStatus, roundUpDivide(expectedEntries, types.size()))) + .toArray(BlockBuilder[]::new)); + } + + public InterleavedBlockBuilder(List types, BlockBuilderStatus blockBuilderStatus, int expectedEntries, int expectedBytesPerEntry) + { + this( + types.stream() + .map(t -> t.createBlockBuilder(blockBuilderStatus, roundUpDivide(expectedEntries, types.size()), expectedBytesPerEntry)) + .toArray(BlockBuilder[]::new)); + } + + private static int roundUpDivide(int dividend, int divisor) + { + return (dividend + divisor - 1) / divisor; + } + + /** + * Caller of this private constructor is responsible for making sure every element in `blockBuilders` is constructed with the same `blockBuilderStatus` + */ + private InterleavedBlockBuilder(BlockBuilder[] blockBuilders) + { + super(blockBuilders.length); + this.blockBuilders = requireNonNull(blockBuilders, "blockBuilders is null"); + this.blockEncoding = computeBlockEncoding(); + this.positionCount = 0; + this.sizeInBytes = 0; + this.retainedSizeInBytes = INSTANCE_SIZE; + for (BlockBuilder blockBuilder : blockBuilders) { + this.sizeInBytes += blockBuilder.getSizeInBytes(); + this.retainedSizeInBytes += blockBuilder.getRetainedSizeInBytes(); + } + this.startSize = -1; + this.startRetainedSize = -1; + } + + @Override + protected Block getBlock(int blockIndex) + { + if (blockIndex < 0) { + throw new IllegalArgumentException("position is not valid"); + } + + return blockBuilders[blockIndex]; + } + + @Override + protected int toAbsolutePosition(int position) + { + return position; + } + + @Override + public InterleavedBlockEncoding getEncoding() + { + return blockEncoding; + } + + @Override + public int getPositionCount() + { + return positionCount; + } + + @Override + public long getSizeInBytes() + { + return sizeInBytes; + } + + @Override + public long getRetainedSizeInBytes() + { + return retainedSizeInBytes; + } + + @Override + public void retainedBytesForEachPart(BiConsumer consumer) + { + consumer.accept(blockBuilders, retainedSizeInBytes - INSTANCE_SIZE); + consumer.accept(this, (long) INSTANCE_SIZE); + } + + private void recordStartSizesIfNecessary(BlockBuilder blockBuilder) + { + if (startSize < 0) { + startSize = blockBuilder.getSizeInBytes(); + } + if (startRetainedSize < 0) { + startRetainedSize = blockBuilder.getRetainedSizeInBytes(); + } + } + + @Override + public BlockBuilder writeByte(int value) + { + BlockBuilder blockBuilder = blockBuilders[currentBlockIndex]; + recordStartSizesIfNecessary(blockBuilder); + blockBuilder.writeByte(value); + return this; + } + + @Override + public BlockBuilder writeShort(int value) + { + BlockBuilder blockBuilder = blockBuilders[currentBlockIndex]; + recordStartSizesIfNecessary(blockBuilder); + blockBuilder.writeShort(value); + return this; + } + + @Override + public BlockBuilder writeInt(int value) + { + BlockBuilder blockBuilder = blockBuilders[currentBlockIndex]; + recordStartSizesIfNecessary(blockBuilder); + blockBuilder.writeInt(value); + return this; + } + + @Override + public BlockBuilder writeLong(long value) + { + BlockBuilder blockBuilder = blockBuilders[currentBlockIndex]; + recordStartSizesIfNecessary(blockBuilder); + blockBuilder.writeLong(value); + return this; + } + + @Override + public BlockBuilder writeBytes(Slice source, int sourceIndex, int length) + { + BlockBuilder blockBuilder = blockBuilders[currentBlockIndex]; + recordStartSizesIfNecessary(blockBuilder); + blockBuilder.writeBytes(source, sourceIndex, length); + return this; + } + + @Override + public BlockBuilder writeObject(Object value) + { + BlockBuilder blockBuilder = blockBuilders[currentBlockIndex]; + recordStartSizesIfNecessary(blockBuilder); + blockBuilder.writeObject(value); + return this; + } + + @Override + public BlockBuilder beginBlockEntry() + { + BlockBuilder blockBuilder = blockBuilders[currentBlockIndex]; + recordStartSizesIfNecessary(blockBuilder); + return blockBuilder.beginBlockEntry(); + } + + @Override + public BlockBuilder closeEntry() + { + BlockBuilder blockBuilder = blockBuilders[currentBlockIndex]; + if (startSize < 0 || startRetainedSize < 0) { + throw new IllegalStateException("closeEntry called before anything is written"); + } + blockBuilder.closeEntry(); + entryAdded(); + return this; + } + + @Override + public BlockBuilder appendNull() + { + BlockBuilder blockBuilder = blockBuilders[currentBlockIndex]; + if (startSize >= 0 || startRetainedSize >= 0) { + throw new IllegalStateException("appendNull called when some entry has been written"); + } + startSize = blockBuilder.getSizeInBytes(); + startRetainedSize = blockBuilder.getRetainedSizeInBytes(); + blockBuilder.appendNull(); + entryAdded(); + return this; + } + + private void entryAdded() + { + BlockBuilder blockBuilder = blockBuilders[currentBlockIndex]; + sizeInBytes += blockBuilder.getSizeInBytes() - startSize; + retainedSizeInBytes += blockBuilder.getRetainedSizeInBytes() - startRetainedSize; + startSize = -1; + startRetainedSize = -1; + + positionCount++; + currentBlockIndex++; + if (currentBlockIndex == getBlockCount()) { + currentBlockIndex = 0; + } + + // All bytes added have been reported to blockBuilderStatus by child block builders. No report to blockBuilderStatus necessary here. + } + + @Override + public Block getRegion(int position, int length) + { + validateRange(position, length); + return sliceRange(position, length, false); + } + + @Override + public InterleavedBlock build() + { + Block[] blocks = new Block[getBlockCount()]; + for (int i = 0; i < getBlockCount(); i++) { + blocks[i] = blockBuilders[i].build(); + } + return new InterleavedBlock(blocks); + } + + @Override + public BlockBuilder newBlockBuilderLike(BlockBuilderStatus blockBuilderStatus) + { + BlockBuilder[] newBlockBuilders = new BlockBuilder[blockBuilders.length]; + for (int i = 0; i < blockBuilders.length; i++) { + newBlockBuilders[i] = blockBuilders[i].newBlockBuilderLike(blockBuilderStatus); + } + return new InterleavedBlockBuilder(newBlockBuilders); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder("InterleavedBlock{"); + sb.append("columns=").append(getBlockCount()); + sb.append(", positionCount=").append(getPositionCount()); + sb.append('}'); + return sb.toString(); + } +} diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/block/InterleavedBlockEncoding.java b/presto-spi/src/main/java/com/facebook/presto/spi/block/InterleavedBlockEncoding.java new file mode 100644 index 0000000000000..fadf168d89c50 --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/block/InterleavedBlockEncoding.java @@ -0,0 +1,100 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi.block; + +import com.facebook.presto.spi.type.TypeManager; +import io.airlift.slice.SliceInput; +import io.airlift.slice.SliceOutput; + +public class InterleavedBlockEncoding + implements BlockEncoding +{ + public static final BlockEncodingFactory FACTORY = new InterleavedBlockEncodingFactory(); + private static final String NAME = "INTERLEAVED"; + + private final BlockEncoding[] individualBlockEncodings; + + public InterleavedBlockEncoding(BlockEncoding[] individualBlockEncodings) + { + this.individualBlockEncodings = individualBlockEncodings; + } + + @Override + public String getName() + { + return NAME; + } + + @Override + public void writeBlock(SliceOutput sliceOutput, Block block) + { + AbstractInterleavedBlock interleavedBlock = (AbstractInterleavedBlock) block; + + if (interleavedBlock.getBlockCount() != individualBlockEncodings.length) { + throw new IllegalArgumentException( + "argument block differs in length (" + interleavedBlock.getBlockCount() + ") with this encoding (" + individualBlockEncodings.length + ")"); + } + + Block[] subBlocks = interleavedBlock.computeSerializableSubBlocks(); + for (int i = 0; i < subBlocks.length; i++) { + individualBlockEncodings[i].writeBlock(sliceOutput, subBlocks[i]); + } + } + + @Override + public Block readBlock(SliceInput sliceInput) + { + Block[] individualBlocks = new Block[individualBlockEncodings.length]; + for (int i = 0; i < individualBlockEncodings.length; i++) { + individualBlocks[i] = individualBlockEncodings[i].readBlock(sliceInput); + } + return new InterleavedBlock(individualBlocks); + } + + @Override + public BlockEncodingFactory getFactory() + { + return FACTORY; + } + + public static class InterleavedBlockEncodingFactory + implements BlockEncodingFactory + { + @Override + public String getName() + { + return NAME; + } + + @Override + public InterleavedBlockEncoding readEncoding(TypeManager manager, BlockEncodingSerde serde, SliceInput input) + { + int individualBlockEncodingsCount = input.readInt(); + BlockEncoding[] individualBlockEncodings = new BlockEncoding[individualBlockEncodingsCount]; + for (int i = 0; i < individualBlockEncodingsCount; i++) { + individualBlockEncodings[i] = serde.readBlockEncoding(input); + } + return new InterleavedBlockEncoding(individualBlockEncodings); + } + + @Override + public void writeEncoding(BlockEncodingSerde serde, SliceOutput output, InterleavedBlockEncoding blockEncoding) + { + output.appendInt(blockEncoding.individualBlockEncodings.length); + for (BlockEncoding individualBlockEncoding : blockEncoding.individualBlockEncodings) { + serde.writeBlockEncoding(output, individualBlockEncoding); + } + } + } +}