From 0f526c97be7f589e3026bf53ed9f0ad7bbe60fba Mon Sep 17 00:00:00 2001 From: Jimmy Xiang Date: Sat, 19 Nov 2016 01:15:04 -0800 Subject: [PATCH] Support Schema Evolution in Parquet --- .../hive/parquet/ParquetHiveRecordCursor.java | 467 ++++++++++++++++-- .../presto/hive/TestHiveFileFormats.java | 246 ++++++++- 2 files changed, 642 insertions(+), 71 deletions(-) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java index 234866e7e7d78..ad473b5389e3d 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/parquet/ParquetHiveRecordCursor.java @@ -22,13 +22,16 @@ import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilderStatus; +import com.facebook.presto.spi.block.InterleavedBlockBuilder; import com.facebook.presto.spi.predicate.TupleDomain; import com.facebook.presto.spi.type.DecimalType; import com.facebook.presto.spi.type.Decimals; +import com.facebook.presto.spi.type.NamedTypeSignature; import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.TypeManager; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; + import io.airlift.slice.Slice; import it.unimi.dsi.fastutil.longs.LongArrayList; import org.apache.hadoop.conf.Configuration; @@ -57,7 +60,9 @@ import java.io.IOException; import java.math.BigInteger; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -333,7 +338,7 @@ private ParquetRecordReader createParquetRecordReader( FileMetaData fileMetaData = parquetMetadata.getFileMetaData(); MessageType fileSchema = fileMetaData.getSchema(); - PrestoReadSupport readSupport = new PrestoReadSupport(useParquetColumnNames, columns, fileSchema); + PrestoReadSupport readSupport = new PrestoReadSupport(useParquetColumnNames, columns, fileSchema, typeManager); List fields = columns.stream() .filter(column -> column.getColumnType() == REGULAR) @@ -408,7 +413,7 @@ public final class PrestoReadSupport private final List columns; private final List converters; - public PrestoReadSupport(boolean useParquetColumnNames, List columns, MessageType messageType) + public PrestoReadSupport(boolean useParquetColumnNames, List columns, MessageType messageType, TypeManager typeManager) { this.columns = columns; this.useParquetColumnNames = useParquetColumnNames; @@ -431,7 +436,7 @@ public PrestoReadSupport(boolean useParquetColumnNames, List c } } else { - converters.add(new ParquetColumnConverter(createGroupConverter(types[i], parquetType.getName(), parquetType, i), i)); + converters.add(new ParquetColumnConverter(createGroupConverter(types[i], parquetType.getName(), parquetType, i, useParquetColumnNames, typeManager), i)); } } } @@ -684,43 +689,77 @@ private interface BlockConverter void afterValue(); } + private interface IndexedBlockConverter extends BlockConverter + { + void setFieldIndex(int fieldIndex); + int getFieldIndex(); + Type getType(); + } + private abstract static class GroupedConverter extends GroupConverter - implements BlockConverter + implements IndexedBlockConverter { public abstract Block getBlock(); } - private static BlockConverter createConverter(Type prestoType, String columnName, parquet.schema.Type parquetType, int fieldIndex) + private static IndexedBlockConverter createConverter(Type prestoType, String columnName, parquet.schema.Type parquetType, int fieldIndex, boolean useNames, TypeManager typeManager) { if (parquetType.isPrimitive()) { if (parquetType.getOriginalType() == DECIMAL) { DecimalMetadata decimalMetadata = ((PrimitiveType) parquetType).getDecimalMetadata(); - return new ParquetDecimalConverter(createDecimalType(decimalMetadata.getPrecision(), decimalMetadata.getScale())); + return new ParquetDecimalConverter(createDecimalType(decimalMetadata.getPrecision(), decimalMetadata.getScale()), fieldIndex); } else { return new ParquetPrimitiveConverter(prestoType, fieldIndex); } } - return createGroupConverter(prestoType, columnName, parquetType, fieldIndex); + return createGroupConverter(prestoType, columnName, parquetType, fieldIndex, useNames, typeManager); } - private static GroupedConverter createGroupConverter(Type prestoType, String columnName, parquet.schema.Type parquetType, int fieldIndex) + private static GroupedConverter createGroupConverter(Type prestoType, String columnName, parquet.schema.Type parquetType, int fieldIndex, boolean useNames, TypeManager typeManager) { GroupType groupType = parquetType.asGroupType(); switch (prestoType.getTypeSignature().getBase()) { case ARRAY: - return new ParquetListConverter(prestoType, columnName, groupType, fieldIndex); + return new ParquetListConverter(prestoType, columnName, groupType, fieldIndex, useNames, typeManager); case MAP: - return new ParquetMapConverter(prestoType, columnName, groupType, fieldIndex); + return new ParquetMapConverter(prestoType, columnName, groupType, fieldIndex, useNames, typeManager); case ROW: - return new ParquetStructConverter(prestoType, columnName, groupType, fieldIndex); + return new ParquetStructConverter(prestoType, columnName, groupType, fieldIndex, useNames, typeManager); default: throw new IllegalArgumentException("Column " + columnName + " type " + parquetType.getOriginalType() + " not supported"); } } + private static class FieldInformation + { + private final Type type; + private final int index; + + private FieldInformation(int index, Type type) + { + this.index = index; + this.type = type; + } + + static FieldInformation of(int index, Type type) + { + return new FieldInformation(index, type); + } + + public Type getType() + { + return type; + } + + public int getIndex() + { + return index; + } + } + private static class ParquetStructConverter extends GroupedConverter { @@ -728,34 +767,106 @@ private static class ParquetStructConverter private static final int NULL_BUILDER_SIZE_IN_BYTES_THRESHOLD = 32768; private final Type rowType; - private final int fieldIndex; + private int fieldIndex; - private final List converters; + private final List converters; private BlockBuilder builder; private BlockBuilder nullBuilder; // used internally when builder is set to null private BlockBuilder currentEntryBuilder; - public ParquetStructConverter(Type prestoType, String columnName, GroupType entryType, int fieldIndex) + private int[] converterIndexes; + private List outOfOrderFieldTypes; + private InterleavedBlockBuilder outOfOrderBlockBuilder; // a tmp builder for fields out of order + + private List createConverters(String columnName, List prestoTypeParameters, List fieldTypes, TypeManager typeManager) + { + ImmutableList.Builder converters = ImmutableList.builder(); + for (int i = 0, n = prestoTypeParameters.size(); i < n; i++) { + parquet.schema.Type fieldType = fieldTypes.get(i); + converters.add(createConverter(prestoTypeParameters.get(i), columnName + "." + fieldType.getName(), fieldType, i, false, typeManager)); + } + return converters.build(); + } + + private List createConvertersByName(String columnName, Type prestoType, List fieldTypes, TypeManager typeManager) + { + int parquetFieldCount = fieldTypes.size(); + int prestoFieldCount = prestoType.getTypeSignature().getParameters().size(); + + Map fieldTypeMap = new HashMap<>(prestoFieldCount); + for (int i = 0; i < prestoFieldCount; i++) { + NamedTypeSignature namedTypeSignature = prestoType.getTypeSignature().getParameters().get(i).getNamedTypeSignature(); + fieldTypeMap.put(namedTypeSignature.getName().toLowerCase(), FieldInformation.of(i, typeManager.getType(namedTypeSignature.getTypeSignature()))); + } + + boolean outOfOrder = false; + ImmutableList.Builder builder = ImmutableList.builder(); + for (int i = 0, prevFieldIndex = -1; i < parquetFieldCount; i++) { + parquet.schema.Type fieldType = fieldTypes.get(i); + FieldInformation fieldInformation = fieldTypeMap.get(fieldType.getName().toLowerCase()); + if (fieldInformation == null) { + builder.add(fieldType.isPrimitive() ? ParquetDummyPrimitiveConverter.converter : new ParquetDummyGroupConverter(fieldType)); + } + else { + if (prevFieldIndex > fieldInformation.getIndex()) { + outOfOrder = true; + } + builder.add(createConverter(fieldInformation.getType(), columnName + "." + fieldType.getName(), fieldType, fieldInformation.getIndex(), true, typeManager)); + prevFieldIndex = fieldInformation.getIndex(); + } + } + // When reading parquet files, we need to pass converters and parquet types in parquet schema fields order. + // After getting the parquet values, we need to build presto blocks in metastore schema fields order. + List converters = builder.build(); + if (outOfOrder) { + ImmutableList.Builder types = ImmutableList.builder(); + converterIndexes = new int[parquetFieldCount]; + for (int i = 0, tmpIndex = 0; i < parquetFieldCount; i++) { + IndexedBlockConverter converter = converters.get(i); + converterIndexes[i] = converter.getFieldIndex(); + if (converterIndexes[i] < 0) { + continue; + } + converter.setFieldIndex(tmpIndex++); + types.add(converter.getType()); + } + outOfOrderFieldTypes = types.build(); + } + return converters; + } + + public ParquetStructConverter(Type prestoType, String columnName, GroupType entryType, int fieldIndex, boolean useNames, TypeManager typeManager) { checkArgument(ROW.equals(prestoType.getTypeSignature().getBase())); - List prestoTypeParameters = prestoType.getTypeParameters(); List fieldTypes = entryType.getFields(); - checkArgument( - prestoTypeParameters.size() == fieldTypes.size(), - "Schema mismatch, metastore schema for row column %s has %s fields but parquet schema has %s fields", - columnName, - prestoTypeParameters.size(), - fieldTypes.size()); + + if (useNames || fieldTypes.size() != prestoType.getTypeSignature().getParameters().size()) { + this.converters = createConvertersByName(columnName, prestoType, fieldTypes, typeManager); + } + else { + this.converters = createConverters(columnName, prestoType.getTypeParameters(), fieldTypes, typeManager); + } this.rowType = prestoType; this.fieldIndex = fieldIndex; + } + + @Override + public Type getType() + { + return rowType; + } + + @Override + public void setFieldIndex(int fieldIndex) + { + this.fieldIndex = fieldIndex; + } - ImmutableList.Builder converters = ImmutableList.builder(); - for (int i = 0; i < prestoTypeParameters.size(); i++) { - parquet.schema.Type fieldType = fieldTypes.get(i); - converters.add(createConverter(prestoTypeParameters.get(i), columnName + "." + fieldType.getName(), fieldType, i)); - } - this.converters = converters.build(); + @Override + public int getFieldIndex() + { + return fieldIndex; } @Override @@ -768,6 +879,9 @@ public Converter getConverter(int fieldIndex) public void beforeValue(BlockBuilder builder) { this.builder = builder; + if (outOfOrderFieldTypes != null) { + outOfOrderBlockBuilder = new InterleavedBlockBuilder(outOfOrderFieldTypes, new BlockBuilderStatus(), NULL_BUILDER_POSITIONS_THRESHOLD); + } } @Override @@ -777,15 +891,23 @@ public void start() if (nullBuilder == null || (nullBuilder.getPositionCount() >= NULL_BUILDER_POSITIONS_THRESHOLD && nullBuilder.getSizeInBytes() >= NULL_BUILDER_SIZE_IN_BYTES_THRESHOLD)) { nullBuilder = rowType.createBlockBuilder(new BlockBuilderStatus(), NULL_BUILDER_POSITIONS_THRESHOLD); } - currentEntryBuilder = nullBuilder.beginBlockEntry(); + currentEntryBuilder = nullBuilder; } else { while (builder.getPositionCount() < fieldIndex) { builder.appendNull(); } - currentEntryBuilder = builder.beginBlockEntry(); + currentEntryBuilder = builder; } - for (BlockConverter converter : converters) { + + if (outOfOrderBlockBuilder != null) { + currentEntryBuilder = outOfOrderBlockBuilder; + } + else { + currentEntryBuilder = currentEntryBuilder.beginBlockEntry(); + } + + for (IndexedBlockConverter converter : converters) { converter.beforeValue(currentEntryBuilder); } } @@ -796,7 +918,32 @@ public void end() for (BlockConverter converter : converters) { converter.afterValue(); } - while (currentEntryBuilder.getPositionCount() < converters.size()) { + if (outOfOrderBlockBuilder != null) { + if (builder == null) { + currentEntryBuilder = nullBuilder; + } + else { + currentEntryBuilder = builder; + } + + currentEntryBuilder = currentEntryBuilder.beginBlockEntry(); + List indexedConverters = new ArrayList<>(converters); + indexedConverters.sort((IndexedBlockConverter converter1, IndexedBlockConverter converter2)->Integer.compare(converterIndexes[converter1.getFieldIndex()], converterIndexes[converter2.getFieldIndex()])); + int positionCount = outOfOrderBlockBuilder.getPositionCount(); + for (int i = 0; i < converterIndexes.length; i++) { + int position = indexedConverters.get(i).getFieldIndex(); + if (converterIndexes[position] < 0 || position >= positionCount) { + continue; + } + while (currentEntryBuilder.getPositionCount() < converterIndexes[position]) { + currentEntryBuilder.appendNull(); + } + outOfOrderBlockBuilder.writePositionTo(position, currentEntryBuilder); + currentEntryBuilder.closeEntry(); + } + } + int fieldCount = rowType.getTypeParameters().size(); + while (currentEntryBuilder.getPositionCount() < fieldCount) { currentEntryBuilder.appendNull(); } @@ -828,14 +975,14 @@ private static class ParquetListConverter private static final int NULL_BUILDER_SIZE_IN_BYTES_THRESHOLD = 32768; private final Type arrayType; - private final int fieldIndex; + private int fieldIndex; private final BlockConverter elementConverter; private BlockBuilder builder; private BlockBuilder nullBuilder; // used internally when builder is set to null private BlockBuilder currentEntryBuilder; - public ParquetListConverter(Type prestoType, String columnName, GroupType listType, int fieldIndex) + public ParquetListConverter(Type prestoType, String columnName, GroupType listType, int fieldIndex, boolean useNames, TypeManager typeManager) { checkArgument( listType.getFieldCount() == 1, @@ -861,10 +1008,10 @@ public ParquetListConverter(Type prestoType, String columnName, GroupType listTy // documentation at http://git.io/vOpNz. parquet.schema.Type elementType = listType.getType(0); if (isElementType(elementType, listType.getName())) { - elementConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".element", elementType, 0); + elementConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".element", elementType, 0, useNames, typeManager); } else { - elementConverter = new ParquetListEntryConverter(prestoType.getTypeParameters().get(0), columnName, elementType.asGroupType()); + elementConverter = new ParquetListEntryConverter(prestoType.getTypeParameters().get(0), columnName, elementType.asGroupType(), useNames, typeManager); } } @@ -890,6 +1037,24 @@ private boolean isElementType(parquet.schema.Type repeatedType, String parentNam return false; } + @Override + public Type getType() + { + return arrayType; + } + + @Override + public void setFieldIndex(int fieldIndex) + { + this.fieldIndex = fieldIndex; + } + + @Override + public int getFieldIndex() + { + return fieldIndex; + } + @Override public void beforeValue(BlockBuilder builder) { @@ -958,7 +1123,7 @@ private static class ParquetListEntryConverter private BlockBuilder builder; private int startingPosition; - public ParquetListEntryConverter(Type prestoType, String columnName, GroupType elementType) + public ParquetListEntryConverter(Type prestoType, String columnName, GroupType elementType, boolean useNames, TypeManager typeManager) { checkArgument( elementType.getOriginalType() == null, @@ -972,7 +1137,7 @@ public ParquetListEntryConverter(Type prestoType, String columnName, GroupType e columnName, elementType.getFieldCount()); - elementConverter = createConverter(prestoType, columnName + ".element", elementType.getType(0), 0); + elementConverter = createConverter(prestoType, columnName + ".element", elementType.getType(0), 0, useNames, typeManager); } @Override @@ -1020,14 +1185,14 @@ private static class ParquetMapConverter private static final int NULL_BUILDER_SIZE_IN_BYTES_THRESHOLD = 32768; private final Type mapType; - private final int fieldIndex; + private int fieldIndex; private final ParquetMapEntryConverter entryConverter; private BlockBuilder builder; private BlockBuilder nullBuilder; // used internally when builder is set to null private BlockBuilder currentEntryBuilder; - public ParquetMapConverter(Type type, String columnName, GroupType mapType, int fieldIndex) + public ParquetMapConverter(Type type, String columnName, GroupType mapType, int fieldIndex, boolean useNames, TypeManager typeManager) { checkArgument( mapType.getFieldCount() == 1, @@ -1040,7 +1205,25 @@ public ParquetMapConverter(Type type, String columnName, GroupType mapType, int parquet.schema.Type entryType = mapType.getFields().get(0); - entryConverter = new ParquetMapEntryConverter(type, columnName + ".entry", entryType.asGroupType()); + entryConverter = new ParquetMapEntryConverter(type, columnName + ".entry", entryType.asGroupType(), useNames, typeManager); + } + + @Override + public Type getType() + { + return mapType; + } + + @Override + public void setFieldIndex(int fieldIndex) + { + this.fieldIndex = fieldIndex; + } + + @Override + public int getFieldIndex() + { + return fieldIndex; } @Override @@ -1111,7 +1294,7 @@ private static class ParquetMapEntryConverter private BlockBuilder builder; - public ParquetMapEntryConverter(Type prestoType, String columnName, GroupType entryType) + public ParquetMapEntryConverter(Type prestoType, String columnName, GroupType entryType, boolean useNames, TypeManager typeManager) { checkArgument(MAP.equals(prestoType.getTypeSignature().getBase())); // original version of parquet used null for entry due to a bug @@ -1146,8 +1329,8 @@ public ParquetMapEntryConverter(Type prestoType, String columnName, GroupType en columnName, entryGroupType.getType(0)); - keyConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".key", entryGroupType.getFields().get(0), 0); - valueConverter = createConverter(prestoType.getTypeParameters().get(1), columnName + ".value", entryGroupType.getFields().get(1), 1); + keyConverter = createConverter(prestoType.getTypeParameters().get(0), columnName + ".key", entryGroupType.getFields().get(0), 0, useNames, typeManager); + valueConverter = createConverter(prestoType.getTypeParameters().get(1), columnName + ".value", entryGroupType.getFields().get(1), 1, useNames, typeManager); } @Override @@ -1195,10 +1378,10 @@ public void afterValue() private static class ParquetPrimitiveConverter extends PrimitiveConverter - implements BlockConverter + implements IndexedBlockConverter { private final Type type; - private final int fieldIndex; + private int fieldIndex; private BlockBuilder builder; public ParquetPrimitiveConverter(Type type, int fieldIndex) @@ -1207,6 +1390,24 @@ public ParquetPrimitiveConverter(Type type, int fieldIndex) this.fieldIndex = fieldIndex; } + @Override + public Type getType() + { + return type; + } + + @Override + public void setFieldIndex(int fieldIndex) + { + this.fieldIndex = fieldIndex; + } + + @Override + public int getFieldIndex() + { + return fieldIndex; + } + @Override public void beforeValue(BlockBuilder builder) { @@ -1309,15 +1510,35 @@ public void addInt(int value) private static class ParquetDecimalConverter extends PrimitiveConverter - implements BlockConverter + implements IndexedBlockConverter { private final DecimalType decimalType; + private int fieldIndex; private BlockBuilder builder; private boolean wroteValue; - public ParquetDecimalConverter(DecimalType decimalType) + public ParquetDecimalConverter(DecimalType decimalType, int fieldIndex) { this.decimalType = requireNonNull(decimalType, "decimalType is null"); + this.fieldIndex = fieldIndex; + } + + @Override + public Type getType() + { + return decimalType; + } + + @Override + public void setFieldIndex(int fieldIndex) + { + this.fieldIndex = fieldIndex; + } + + @Override + public int getFieldIndex() + { + return fieldIndex; } @Override @@ -1368,4 +1589,156 @@ public void addBinary(Binary value) wroteValue = true; } } + + private static class ParquetDummyPrimitiveConverter + extends PrimitiveConverter + implements IndexedBlockConverter + { + static ParquetDummyPrimitiveConverter converter = new ParquetDummyPrimitiveConverter(); + + @Override + public Type getType() + { + return null; + } + + @Override + public void setFieldIndex(int fieldIndex) + { + } + + @Override + public int getFieldIndex() + { + return -1; + } + + @Override + public void beforeValue(BlockBuilder builder) + { + } + + @Override + public void afterValue() + { + } + + @Override + public boolean isPrimitive() + { + return true; + } + + @Override + public PrimitiveConverter asPrimitiveConverter() + { + return this; + } + + @Override + public boolean hasDictionarySupport() + { + return false; + } + + @Override + public void setDictionary(Dictionary dictionary) + { + } + + @Override + public void addValueFromDictionary(int dictionaryId) + { + } + + @Override + public void addBoolean(boolean value) + { + } + + @Override + public void addDouble(double value) + { + } + + @Override + public void addLong(long value) + { + } + + @Override + public void addBinary(Binary value) + { + } + + @Override + public void addFloat(float value) + { + } + + @Override + public void addInt(int value) + { + } + } + + private static class ParquetDummyGroupConverter + extends GroupConverter + implements IndexedBlockConverter + { + private final List converters; + + public ParquetDummyGroupConverter(parquet.schema.Type fieldType) + { + requireNonNull(fieldType, "fieldType is null"); + ImmutableList.Builder builder = ImmutableList.builder(); + for (parquet.schema.Type type : fieldType.asGroupType().getFields()) { + builder.add(type.isPrimitive() ? ParquetDummyPrimitiveConverter.converter : new ParquetDummyGroupConverter(type)); + } + converters = builder.build(); + } + + @Override + public void beforeValue(BlockBuilder builder) + { + } + + @Override + public void afterValue() + { + } + + @Override + public void setFieldIndex(int fieldIndex) + { + } + + @Override + public int getFieldIndex() + { + return -1; + } + + @Override + public Type getType() + { + return null; + } + + @Override + public void end() + { + } + + @Override + public Converter getConverter(int fieldIndex) + { + return converters.get(fieldIndex); + } + + @Override + public void start() + { + } + } } diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java index c3b7ba99bbbc5..c0b4a8014a84e 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java @@ -23,7 +23,9 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.RecordCursor; import com.facebook.presto.spi.RecordPageSource; +import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.predicate.TupleDomain; +import com.facebook.presto.spi.type.Type; import com.facebook.presto.testing.TestingConnectorSession; import com.facebook.presto.type.ArrayType; import com.facebook.presto.type.RowType; @@ -32,6 +34,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveVarchar; @@ -44,6 +47,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; @@ -98,6 +102,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +@SuppressWarnings({"deprecation", "unused"}) public class TestHiveFileFormats extends AbstractTestHiveFileFormats { @@ -402,54 +407,247 @@ private static List getTestColumnsSupportedByParquet() .collect(toList()); } - @Test(dataProvider = "rowCount") - public void testParquetThrift(int rowCount) + @Test + public void testParquetExtraStructFields1() + throws Exception + { + List nameTypes = ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType(), createUnboundedVarcharType()); + RowType nameType = new RowType(nameTypes, Optional.empty()); + Block expectedNameBlock = rowBlockOf(nameTypes, "Bob", "Roberts", null); + StandardStructObjectInspector nameObjectInspector = getStandardStructObjectInspector( + ImmutableList.of("first_name", "last_name", "suffix"), + ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector, javaStringObjectInspector) + ); + + StandardStructObjectInspector phoneObjectInspector = getStandardStructObjectInspector( + ImmutableList.of("number", "type", "fake_name_struct"), + ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector, nameObjectInspector) + ); + Block expectedPhoneBlock = rowBlockOf(ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType(), nameType), "1234567890", null, null); + + testParquetReadAndVerify(nameType, + new RowType(ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType(), nameType), Optional.empty()), + nameObjectInspector, + phoneObjectInspector, + expectedNameBlock, + expectedPhoneBlock, + true); + } + + @Test + public void testParquetExtraStructFields2() + throws Exception + { + List nameTypes = ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType(), createUnboundedVarcharType(), createUnboundedVarcharType()); + RowType nameType = new RowType(nameTypes, Optional.empty()); + Block expectedNameBlock = rowBlockOf(nameTypes, "Bob", null, "Roberts", null); + StandardStructObjectInspector nameObjectInspector = getStandardStructObjectInspector( + ImmutableList.of("first_name", "middle", "last_name", "suffix"), + ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector, javaStringObjectInspector, javaStringObjectInspector) + ); + + StandardStructObjectInspector phoneObjectInspector = getStandardStructObjectInspector( + ImmutableList.of("fake_name_struct", "number", "type"), + ImmutableList.of(nameObjectInspector, javaStringObjectInspector, javaStringObjectInspector) + ); + Block expectedPhoneBlock = rowBlockOf(ImmutableList.of(nameType, createUnboundedVarcharType(), createUnboundedVarcharType()), null, "1234567890", null); + + testParquetReadAndVerify(nameType, + new RowType(ImmutableList.of(nameType, createUnboundedVarcharType(), createUnboundedVarcharType()), Optional.empty()), + nameObjectInspector, + phoneObjectInspector, + expectedNameBlock, + expectedPhoneBlock, + true); + } + + @Test + public void testParquetRemovedStructFields1() + throws Exception + { + List nameTypes = ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType()); + RowType nameType = new RowType(nameTypes, Optional.empty()); + Block expectedNameBlock = rowBlockOf(nameTypes, "Bob", null); + StandardStructObjectInspector nameObjectInspector = getStandardStructObjectInspector( + ImmutableList.of("first_name", "middle"), + ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector) + ); + + StandardStructObjectInspector phoneObjectInspector = getStandardStructObjectInspector( + ImmutableList.of("number"), + ImmutableList.of(javaStringObjectInspector) + ); + Block expectedPhoneBlock = rowBlockOf(ImmutableList.of(createUnboundedVarcharType()), "1234567890"); + + testParquetReadAndVerify(nameType, + new RowType(ImmutableList.of(createUnboundedVarcharType()), Optional.empty()), + nameObjectInspector, + phoneObjectInspector, + expectedNameBlock, + expectedPhoneBlock, + true); + } + + @Test + public void testParquetRemovedStructFields2() + throws Exception + { + List nameTypes = ImmutableList.of(createUnboundedVarcharType()); + RowType nameType = new RowType(nameTypes, Optional.empty()); + Block expectedNameBlock = rowBlockOf(nameTypes, "Roberts"); + StandardStructObjectInspector nameObjectInspector = getStandardStructObjectInspector( + ImmutableList.of("last_name"), + ImmutableList.of(javaStringObjectInspector) + ); + + StandardStructObjectInspector phoneObjectInspector = getStandardStructObjectInspector( + ImmutableList.of("fake_name_struct", "number"), + ImmutableList.of(nameObjectInspector, javaStringObjectInspector) + ); + Block expectedPhoneBlock = rowBlockOf(ImmutableList.of(nameType, createUnboundedVarcharType()), null, "1234567890"); + + testParquetReadAndVerify(nameType, + new RowType(ImmutableList.of(nameType, createUnboundedVarcharType()), Optional.empty()), + nameObjectInspector, + phoneObjectInspector, + expectedNameBlock, + expectedPhoneBlock, + true); + } + + @Test + public void testParquetStructFieldOrderChanged() + throws Exception + { + List nameTypes = ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType()); + RowType nameType = new RowType(nameTypes, Optional.empty()); + Block expectedNameBlock = rowBlockOf(nameTypes, "Roberts", "Bob"); + StandardStructObjectInspector nameObjectInspector = getStandardStructObjectInspector( + ImmutableList.of("last_name", "first_name"), + ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector) + ); + + StandardStructObjectInspector phoneObjectInspector = getStandardStructObjectInspector( + ImmutableList.of("type", "number"), + ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector) + ); + Block expectedPhoneBlock = rowBlockOf(ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType()), null, "1234567890"); + + testParquetReadAndVerify(nameType, + new RowType(ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType()), Optional.empty()), + nameObjectInspector, + phoneObjectInspector, + expectedNameBlock, + expectedPhoneBlock, + true); + } + + @Test + public void testParquetStructFieldOrderChangedWithExtraFields() + throws Exception + { + List nameTypes = ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType(), createUnboundedVarcharType(), createUnboundedVarcharType()); + RowType nameType = new RowType(nameTypes, Optional.empty()); + Block expectedNameBlock = rowBlockOf(nameTypes, null, "Roberts", null, "Bob"); + StandardStructObjectInspector nameObjectInspector = getStandardStructObjectInspector( + ImmutableList.of("middle", "last_name", "suffix", "first_name"), + ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector, javaStringObjectInspector, javaStringObjectInspector) + ); + + StandardStructObjectInspector phoneObjectInspector = getStandardStructObjectInspector( + ImmutableList.of("type", "fake_name_struct", "number"), + ImmutableList.of(javaStringObjectInspector, nameObjectInspector, javaStringObjectInspector) + ); + Block expectedPhoneBlock = rowBlockOf(ImmutableList.of(createUnboundedVarcharType(), nameType, createUnboundedVarcharType()), null, null, "1234567890"); + + testParquetReadAndVerify(nameType, + new RowType(ImmutableList.of(createUnboundedVarcharType(), nameType, createUnboundedVarcharType()), Optional.empty()), + nameObjectInspector, + phoneObjectInspector, + expectedNameBlock, + expectedPhoneBlock, + true); + } + + @Test + public void testParquetThriftUseNames() + throws Exception + { + testParquetThrift(true); + } + + @Test + public void testParquetThriftUseIndexes() + throws Exception + { + testParquetThrift(false); + } + + private void testParquetThrift(boolean useParquetColumnNames) + throws Exception + { + List nameTypes = ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType()); + Block expectedNameBlock = rowBlockOf(nameTypes, "Bob", "Roberts"); + StandardStructObjectInspector nameObjectInspector = getStandardStructObjectInspector( + ImmutableList.of("first_name", "last_name"), + ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector) + ); + + StandardStructObjectInspector phoneObjectInspector = getStandardStructObjectInspector( + ImmutableList.of("number", "type"), + ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector) + ); + Block expectedPhoneBlock = rowBlockOf(ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType()), "1234567890", null); + + testParquetReadAndVerify(new RowType(nameTypes, Optional.empty()), + new RowType(ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType()), Optional.empty()), + nameObjectInspector, + phoneObjectInspector, + expectedNameBlock, + expectedPhoneBlock, + useParquetColumnNames); + } + + private void testParquetReadAndVerify(RowType nameType, + RowType phoneType, + StandardStructObjectInspector nameObjectInspector, + StandardStructObjectInspector phoneObjectInspector, + Block expectedNameBlock, + Block expectedPhoneBlock, + boolean useParquetColumnNames) throws Exception { - RowType nameType = new RowType(ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType()), Optional.empty()); - RowType phoneType = new RowType(ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType()), Optional.empty()); RowType personType = new RowType(ImmutableList.of(nameType, INTEGER, createUnboundedVarcharType(), new ArrayType(phoneType)), Optional.empty()); - List testColumns = ImmutableList.of( + List testColumns = ImmutableList.of( new TestColumn( "persons", getStandardListObjectInspector( getStandardStructObjectInspector( ImmutableList.of("name", "id", "email", "phones"), - ImmutableList.of( - getStandardStructObjectInspector( - ImmutableList.of("first_name", "last_name"), - ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector) - ), + ImmutableList.of( + nameObjectInspector, javaIntObjectInspector, javaStringObjectInspector, - getStandardListObjectInspector( - getStandardStructObjectInspector( - ImmutableList.of("number", "type"), - ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector) - ) - ) + getStandardListObjectInspector(phoneObjectInspector)) ) - ) - ), + ), null, arrayBlockOf(personType, rowBlockOf(ImmutableList.of(nameType, INTEGER, createUnboundedVarcharType(), new ArrayType(phoneType)), - rowBlockOf(ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType()), "Bob", "Roberts"), + expectedNameBlock, 0, "bob.roberts@example.com", - arrayBlockOf(phoneType, rowBlockOf(ImmutableList.of(createUnboundedVarcharType(), createUnboundedVarcharType()), "1234567890", null)) + arrayBlockOf(phoneType, expectedPhoneBlock)) ) ) - ) ); InputFormat inputFormat = new MapredParquetInputFormat(); - @SuppressWarnings("deprecation") SerDe serde = new ParquetHiveSerDe(); File file = new File(this.getClass().getClassLoader().getResource("addressbook.parquet").getPath()); FileSplit split = new FileSplit(new Path(file.getAbsolutePath()), 0, file.length(), new String[0]); - HiveRecordCursorProvider cursorProvider = new ParquetRecordCursorProvider(false, HDFS_ENVIRONMENT); + HiveRecordCursorProvider cursorProvider = new ParquetRecordCursorProvider(useParquetColumnNames, HDFS_ENVIRONMENT); testCursorProvider(cursorProvider, split, inputFormat, serde, testColumns, 1); } @@ -584,7 +782,7 @@ public void testFailForLongVarcharPartitionColumn() private void testCursorProvider(HiveRecordCursorProvider cursorProvider, FileSplit split, InputFormat inputFormat, - @SuppressWarnings("deprecation") SerDe serde, + SerDe serde, List testColumns, int rowCount) throws IOException