diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/ParquetHiveRecordCursor.java b/presto-hive/src/main/java/com/facebook/presto/hive/ParquetHiveRecordCursor.java index 2695b5586e108..9901ebd7c1218 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/ParquetHiveRecordCursor.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/ParquetHiveRecordCursor.java @@ -736,7 +736,28 @@ public ParquetListConverter(String columnName, GroupType listType) columnName, listType.getFieldCount()); - elementConverter = new ParquetListEntryConverter(columnName, listType.getType(0).asGroupType()); + // The Parquet specification requires that the element value of a + // LIST type be wrapped in an inner repeated group, like so: + // + // optional group listField (LIST) { + // repeated group list { + // optional int element + // } + // } + // + // However, some parquet libraries don't follow this spec. The + // compatibility rules used here are specified in the Parquet + // documentation at http://git.io/vf3wG. + parquet.schema.Type elementType = listType.getType(0); + if (elementType.isPrimitive() || + elementType.asGroupType().getFieldCount() > 1 || + elementType.getName().equals("array") || + elementType.getName().equals(listType.getName() + "_tuple")) { + elementConverter = createConverter(columnName + ".element", elementType); + } + else { + elementConverter = new ParquetListEntryConverter(columnName, elementType.asGroupType()); + } } @Override @@ -819,7 +840,7 @@ public Converter getConverter(int fieldIndex) if (fieldIndex == 0) { return (Converter) elementConverter; } - throw new IllegalArgumentException("LIST entry field must be 0 or 1 not " + fieldIndex); + throw new IllegalArgumentException("LIST entry field must be 0 not " + fieldIndex); } @Override diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileFormats.java b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileFormats.java index 2f4fd433232d7..0bc7baa5867bf 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileFormats.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/AbstractTestHiveFileFormats.java @@ -113,7 +113,6 @@ public abstract class AbstractTestHiveFileFormats { private static final ConnectorSession SESSION = new ConnectorSession("user", UTC_KEY, ENGLISH, System.currentTimeMillis(), null); - private static final int NUM_ROWS = 1000; private static final double EPSILON = 0.001; private static final TypeManager TYPE_MANAGER = new TypeRegistry(); @@ -126,6 +125,7 @@ public abstract class AbstractTestHiveFileFormats public static final String TIMESTAMP_STRING = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS").print(TIMESTAMP); // TODO: support null values and determine if timestamp and binary are allowed as partition keys + public static final int NUM_ROWS = 1000; public static final List TEST_COLUMNS = ImmutableList.builder() .add(new TestColumn("p_empty_string", javaStringObjectInspector, "", Slices.EMPTY_SLICE, true)) .add(new TestColumn("p_string", javaStringObjectInspector, "test", Slices.utf8Slice("test"), true)) @@ -260,7 +260,8 @@ public FileSplit createTestFile(String filePath, HiveOutputFormat outputFormat, @SuppressWarnings("deprecation") SerDe serDe, String compressionCodec, - List testColumns) + List testColumns, + int numRows) throws Exception { // filter out partition keys, which are not written to the file @@ -308,7 +309,7 @@ public void progress() List fields = ImmutableList.copyOf(objectInspector.getAllStructFieldRefs()); - for (int rowNumber = 0; rowNumber < NUM_ROWS; rowNumber++) { + for (int rowNumber = 0; rowNumber < numRows; rowNumber++) { for (int i = 0; i < testColumns.size(); i++) { Object writeValue = testColumns.get(i).getWriteValue(); if (writeValue instanceof Slice) { @@ -331,10 +332,10 @@ public void progress() return new FileSplit(path, 0, file.length(), new String[0]); } - protected void checkCursor(RecordCursor cursor, List testColumns) + protected void checkCursor(RecordCursor cursor, List testColumns, int numRows) throws IOException { - for (int row = 0; row < NUM_ROWS; row++) { + for (int row = 0; row < numRows; row++) { assertTrue(cursor.advanceNextPosition()); for (int i = 0, testColumnsSize = testColumns.size(); i < testColumnsSize; i++) { TestColumn testColumn = testColumns.get(i); diff --git a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java index 8f7704491c63d..cabe63c73283c 100644 --- a/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java +++ b/presto-hive/src/test/java/com/facebook/presto/hive/TestHiveFileFormats.java @@ -22,11 +22,14 @@ import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.TupleDomain; import com.facebook.presto.spi.type.TimeZoneKey; +import com.facebook.presto.type.ArrayType; +import com.facebook.presto.type.RowType; import com.facebook.presto.type.TypeRegistry; import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; @@ -52,10 +55,15 @@ import java.io.File; import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.TimeZone; import static com.facebook.presto.hive.HiveTestUtils.getTypes; +import static com.facebook.presto.hive.HiveTestUtils.arraySliceOf; +import static com.facebook.presto.hive.HiveTestUtils.rowSliceOf; +import static com.facebook.presto.spi.type.BigintType.BIGINT; +import static com.facebook.presto.spi.type.VarcharType.VARCHAR; import static com.google.common.base.Predicates.not; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.transform; @@ -63,6 +71,10 @@ import static java.util.stream.Collectors.toList; import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT; import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB; +import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardListObjectInspector; +import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardStructObjectInspector; +import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaStringObjectInspector; +import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaIntObjectInspector; import static org.testng.Assert.assertEquals; public class TestHiveFileFormats @@ -102,9 +114,9 @@ public boolean apply(TestColumn testColumn) SerDe serde = new ColumnarSerDe(); File file = File.createTempFile("presto_test", "rc-text"); try { - FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns); - testCursorProvider(new ColumnarTextHiveRecordCursorProvider(), split, inputFormat, serde, testColumns); - testCursorProvider(new GenericHiveRecordCursorProvider(), split, inputFormat, serde, testColumns); + FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns, NUM_ROWS); + testCursorProvider(new ColumnarTextHiveRecordCursorProvider(), split, inputFormat, serde, testColumns, NUM_ROWS); + testCursorProvider(new GenericHiveRecordCursorProvider(), split, inputFormat, serde, testColumns, NUM_ROWS); } finally { //noinspection ResultOfMethodCallIgnored @@ -123,7 +135,7 @@ public void testRcTextPageSource() File file = File.createTempFile("presto_test", "rc-binary"); file.delete(); try { - FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS); + FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS, NUM_ROWS); testPageSourceFactory(new RcFilePageSourceFactory(TYPE_MANAGER), split, inputFormat, serde, TEST_COLUMNS); } finally { @@ -142,9 +154,9 @@ public void testRCBinary() SerDe serde = new LazyBinaryColumnarSerDe(); File file = File.createTempFile("presto_test", "rc-binary"); try { - FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS); - testCursorProvider(new ColumnarBinaryHiveRecordCursorProvider(), split, inputFormat, serde, TEST_COLUMNS); - testCursorProvider(new GenericHiveRecordCursorProvider(), split, inputFormat, serde, TEST_COLUMNS); + FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS, NUM_ROWS); + testCursorProvider(new ColumnarBinaryHiveRecordCursorProvider(), split, inputFormat, serde, TEST_COLUMNS, NUM_ROWS); + testCursorProvider(new GenericHiveRecordCursorProvider(), split, inputFormat, serde, TEST_COLUMNS, NUM_ROWS); } finally { //noinspection ResultOfMethodCallIgnored @@ -163,7 +175,7 @@ public void testRcBinaryPageSource() File file = File.createTempFile("presto_test", "rc-binary"); file.delete(); try { - FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS); + FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS, NUM_ROWS); testPageSourceFactory(new RcFilePageSourceFactory(TYPE_MANAGER), split, inputFormat, serde, TEST_COLUMNS); } finally { @@ -183,8 +195,8 @@ public void testOrc() File file = File.createTempFile("presto_test", "orc"); file.delete(); try { - FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS); - testCursorProvider(new OrcRecordCursorProvider(), split, inputFormat, serde, TEST_COLUMNS); + FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS, NUM_ROWS); + testCursorProvider(new OrcRecordCursorProvider(), split, inputFormat, serde, TEST_COLUMNS, NUM_ROWS); } finally { //noinspection ResultOfMethodCallIgnored @@ -203,7 +215,7 @@ public void testOrcDataStream() File file = File.createTempFile("presto_test", "orc"); file.delete(); try { - FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS); + FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS, NUM_ROWS); testPageSourceFactory(new OrcPageSourceFactory(TYPE_MANAGER), split, inputFormat, serde, TEST_COLUMNS); } finally { @@ -237,12 +249,12 @@ public boolean apply(TestColumn testColumn) InputFormat inputFormat = new MapredParquetInputFormat(); @SuppressWarnings("deprecation") SerDe serde = new ParquetHiveSerDe(); - File file = File.createTempFile("presto_test", "ord"); + File file = File.createTempFile("presto_test", "parquet"); file.delete(); try { - FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns); + FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns, NUM_ROWS); HiveRecordCursorProvider cursorProvider = new ParquetRecordCursorProvider(false); - testCursorProvider(cursorProvider, split, inputFormat, serde, testColumns); + testCursorProvider(cursorProvider, split, inputFormat, serde, testColumns, NUM_ROWS); } finally { //noinspection ResultOfMethodCallIgnored @@ -250,6 +262,57 @@ public boolean apply(TestColumn testColumn) } } + @Test + public void testParquetThrift() + throws Exception + { + RowType nameType = new RowType(ImmutableList.of(VARCHAR, VARCHAR), Optional.empty()); + RowType phoneType = new RowType(ImmutableList.of(VARCHAR, VARCHAR), Optional.empty()); + RowType personType = new RowType(ImmutableList.of(nameType, BIGINT, VARCHAR, new ArrayType(phoneType)), Optional.empty()); + + List testColumns = ImmutableList.of( + new TestColumn( + "persons", + getStandardListObjectInspector( + getStandardStructObjectInspector( + ImmutableList.of("name", "id", "email", "phones"), + ImmutableList.of( + getStandardStructObjectInspector( + ImmutableList.of("first_name", "last_name"), + ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector) + ), + javaIntObjectInspector, + javaStringObjectInspector, + getStandardListObjectInspector( + getStandardStructObjectInspector( + ImmutableList.of("number", "type"), + ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector) + ) + ) + ) + ) + ), + null, + arraySliceOf(personType, + rowSliceOf(ImmutableList.of(nameType, BIGINT, VARCHAR, new ArrayType(phoneType)), + rowSliceOf(ImmutableList.of(VARCHAR, VARCHAR), "Bob", "Roberts"), + 0, + "bob.roberts@example.com", + arraySliceOf(phoneType, rowSliceOf(ImmutableList.of(VARCHAR, VARCHAR), "1234567890", null)) + ) + ) + ) + ); + + InputFormat inputFormat = new MapredParquetInputFormat(); + @SuppressWarnings("deprecation") + SerDe serde = new ParquetHiveSerDe(); + File file = new File(this.getClass().getClassLoader().getResource("addressbook.parquet").getPath()); + FileSplit split = new FileSplit(new Path(file.getAbsolutePath()), 0, file.length(), new String[0]); + HiveRecordCursorProvider cursorProvider = new ParquetRecordCursorProvider(false); + testCursorProvider(cursorProvider, split, inputFormat, serde, testColumns, 1); + } + @Test public void testDwrf() throws Exception @@ -272,8 +335,8 @@ public boolean apply(TestColumn testColumn) File file = File.createTempFile("presto_test", "dwrf"); file.delete(); try { - FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns); - testCursorProvider(new DwrfRecordCursorProvider(), split, inputFormat, serde, testColumns); + FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns, NUM_ROWS); + testCursorProvider(new DwrfRecordCursorProvider(), split, inputFormat, serde, testColumns, NUM_ROWS); } finally { //noinspection ResultOfMethodCallIgnored @@ -303,7 +366,7 @@ public boolean apply(TestColumn testColumn) File file = File.createTempFile("presto_test", "dwrf"); file.delete(); try { - FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns); + FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns, NUM_ROWS); testPageSourceFactory(new DwrfPageSourceFactory(TYPE_MANAGER), split, inputFormat, serde, testColumns); } finally { @@ -316,7 +379,8 @@ private void testCursorProvider(HiveRecordCursorProvider cursorProvider, FileSplit split, InputFormat inputFormat, @SuppressWarnings("deprecation") SerDe serde, - List testColumns) + List testColumns, + int numRows) throws IOException { Properties splitProperties = new Properties(); @@ -344,7 +408,7 @@ private void testCursorProvider(HiveRecordCursorProvider cursorProvider, DateTimeZone.getDefault(), TYPE_MANAGER).get(); - checkCursor(cursor, testColumns); + checkCursor(cursor, testColumns, numRows); } private void testPageSourceFactory(HivePageSourceFactory sourceFactory, FileSplit split, InputFormat inputFormat, SerDe serde, List testColumns) diff --git a/presto-hive/src/test/resources/addressbook.parquet b/presto-hive/src/test/resources/addressbook.parquet new file mode 100644 index 0000000000000..0f5b8e0c07dc8 Binary files /dev/null and b/presto-hive/src/test/resources/addressbook.parquet differ