Skip to content

Commit

Permalink
Correctly handle lists in parquet created by writers other than hive
Browse files Browse the repository at this point in the history
Hive serializes Parquet lists in a particular way. It wraps the element field
in an optional group, to support nulls:

    optional group listField (LIST) {
      repeated group bag {
        optional int array_element
      }
    }

However, other sources create lists differently, without the bag group:

    optional group listField (LIST) {
      repeated int listField_tuple
    }

This changes PrestoReadSupport to special case that structure, and adds a
test case for it.
  • Loading branch information
colinmarc authored and cberner committed Apr 22, 2015
1 parent 0f9009b commit 5994e35
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,28 @@ public ParquetListConverter(String columnName, GroupType listType)
columnName,
listType.getFieldCount());

elementConverter = new ParquetListEntryConverter(columnName, listType.getType(0).asGroupType());
// The Parquet specification requires that the element value of a
// LIST type be wrapped in an inner repeated group, like so:
//
// optional group listField (LIST) {
// repeated group list {
// optional int element
// }
// }
//
// However, some parquet libraries don't follow this spec. The
// compatibility rules used here are specified in the Parquet
// documentation at http://git.io/vf3wG.
parquet.schema.Type elementType = listType.getType(0);
if (elementType.isPrimitive() ||
elementType.asGroupType().getFieldCount() > 1 ||
elementType.getName().equals("array") ||
elementType.getName().equals(listType.getName() + "_tuple")) {
elementConverter = createConverter(columnName + ".element", elementType);
}
else {
elementConverter = new ParquetListEntryConverter(columnName, elementType.asGroupType());
}
}

@Override
Expand Down Expand Up @@ -819,7 +840,7 @@ public Converter getConverter(int fieldIndex)
if (fieldIndex == 0) {
return (Converter) elementConverter;
}
throw new IllegalArgumentException("LIST entry field must be 0 or 1 not " + fieldIndex);
throw new IllegalArgumentException("LIST entry field must be 0 not " + fieldIndex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ public abstract class AbstractTestHiveFileFormats
{
private static final ConnectorSession SESSION = new ConnectorSession("user", UTC_KEY, ENGLISH, System.currentTimeMillis(), null);

private static final int NUM_ROWS = 1000;
private static final double EPSILON = 0.001;
private static final TypeManager TYPE_MANAGER = new TypeRegistry();

Expand All @@ -126,6 +125,7 @@ public abstract class AbstractTestHiveFileFormats
public static final String TIMESTAMP_STRING = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS").print(TIMESTAMP);

// TODO: support null values and determine if timestamp and binary are allowed as partition keys
public static final int NUM_ROWS = 1000;
public static final List<TestColumn> TEST_COLUMNS = ImmutableList.<TestColumn>builder()
.add(new TestColumn("p_empty_string", javaStringObjectInspector, "", Slices.EMPTY_SLICE, true))
.add(new TestColumn("p_string", javaStringObjectInspector, "test", Slices.utf8Slice("test"), true))
Expand Down Expand Up @@ -260,7 +260,8 @@ public FileSplit createTestFile(String filePath,
HiveOutputFormat<?, ?> outputFormat,
@SuppressWarnings("deprecation") SerDe serDe,
String compressionCodec,
List<TestColumn> testColumns)
List<TestColumn> testColumns,
int numRows)
throws Exception
{
// filter out partition keys, which are not written to the file
Expand Down Expand Up @@ -308,7 +309,7 @@ public void progress()

List<StructField> fields = ImmutableList.copyOf(objectInspector.getAllStructFieldRefs());

for (int rowNumber = 0; rowNumber < NUM_ROWS; rowNumber++) {
for (int rowNumber = 0; rowNumber < numRows; rowNumber++) {
for (int i = 0; i < testColumns.size(); i++) {
Object writeValue = testColumns.get(i).getWriteValue();
if (writeValue instanceof Slice) {
Expand All @@ -331,10 +332,10 @@ public void progress()
return new FileSplit(path, 0, file.length(), new String[0]);
}

protected void checkCursor(RecordCursor cursor, List<TestColumn> testColumns)
protected void checkCursor(RecordCursor cursor, List<TestColumn> testColumns, int numRows)
throws IOException
{
for (int row = 0; row < NUM_ROWS; row++) {
for (int row = 0; row < numRows; row++) {
assertTrue(cursor.advanceNextPosition());
for (int i = 0, testColumnsSize = testColumns.size(); i < testColumnsSize; i++) {
TestColumn testColumn = testColumns.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.TupleDomain;
import com.facebook.presto.spi.type.TimeZoneKey;
import com.facebook.presto.type.ArrayType;
import com.facebook.presto.type.RowType;
import com.facebook.presto.type.TypeRegistry;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
Expand All @@ -52,17 +55,26 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.TimeZone;

import static com.facebook.presto.hive.HiveTestUtils.getTypes;
import static com.facebook.presto.hive.HiveTestUtils.arraySliceOf;
import static com.facebook.presto.hive.HiveTestUtils.rowSliceOf;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
import static com.google.common.base.Predicates.not;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static java.util.Locale.ENGLISH;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.FILE_INPUT_FORMAT;
import static org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_LIB;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardListObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardStructObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaStringObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaIntObjectInspector;
import static org.testng.Assert.assertEquals;

public class TestHiveFileFormats
Expand Down Expand Up @@ -102,9 +114,9 @@ public boolean apply(TestColumn testColumn)
SerDe serde = new ColumnarSerDe();
File file = File.createTempFile("presto_test", "rc-text");
try {
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns);
testCursorProvider(new ColumnarTextHiveRecordCursorProvider(), split, inputFormat, serde, testColumns);
testCursorProvider(new GenericHiveRecordCursorProvider(), split, inputFormat, serde, testColumns);
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns, NUM_ROWS);
testCursorProvider(new ColumnarTextHiveRecordCursorProvider(), split, inputFormat, serde, testColumns, NUM_ROWS);
testCursorProvider(new GenericHiveRecordCursorProvider(), split, inputFormat, serde, testColumns, NUM_ROWS);
}
finally {
//noinspection ResultOfMethodCallIgnored
Expand All @@ -123,7 +135,7 @@ public void testRcTextPageSource()
File file = File.createTempFile("presto_test", "rc-binary");
file.delete();
try {
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS);
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS, NUM_ROWS);
testPageSourceFactory(new RcFilePageSourceFactory(TYPE_MANAGER), split, inputFormat, serde, TEST_COLUMNS);
}
finally {
Expand All @@ -142,9 +154,9 @@ public void testRCBinary()
SerDe serde = new LazyBinaryColumnarSerDe();
File file = File.createTempFile("presto_test", "rc-binary");
try {
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS);
testCursorProvider(new ColumnarBinaryHiveRecordCursorProvider(), split, inputFormat, serde, TEST_COLUMNS);
testCursorProvider(new GenericHiveRecordCursorProvider(), split, inputFormat, serde, TEST_COLUMNS);
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS, NUM_ROWS);
testCursorProvider(new ColumnarBinaryHiveRecordCursorProvider(), split, inputFormat, serde, TEST_COLUMNS, NUM_ROWS);
testCursorProvider(new GenericHiveRecordCursorProvider(), split, inputFormat, serde, TEST_COLUMNS, NUM_ROWS);
}
finally {
//noinspection ResultOfMethodCallIgnored
Expand All @@ -163,7 +175,7 @@ public void testRcBinaryPageSource()
File file = File.createTempFile("presto_test", "rc-binary");
file.delete();
try {
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS);
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS, NUM_ROWS);
testPageSourceFactory(new RcFilePageSourceFactory(TYPE_MANAGER), split, inputFormat, serde, TEST_COLUMNS);
}
finally {
Expand All @@ -183,8 +195,8 @@ public void testOrc()
File file = File.createTempFile("presto_test", "orc");
file.delete();
try {
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS);
testCursorProvider(new OrcRecordCursorProvider(), split, inputFormat, serde, TEST_COLUMNS);
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS, NUM_ROWS);
testCursorProvider(new OrcRecordCursorProvider(), split, inputFormat, serde, TEST_COLUMNS, NUM_ROWS);
}
finally {
//noinspection ResultOfMethodCallIgnored
Expand All @@ -203,7 +215,7 @@ public void testOrcDataStream()
File file = File.createTempFile("presto_test", "orc");
file.delete();
try {
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS);
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, TEST_COLUMNS, NUM_ROWS);
testPageSourceFactory(new OrcPageSourceFactory(TYPE_MANAGER), split, inputFormat, serde, TEST_COLUMNS);
}
finally {
Expand Down Expand Up @@ -237,19 +249,70 @@ public boolean apply(TestColumn testColumn)
InputFormat<?, ?> inputFormat = new MapredParquetInputFormat();
@SuppressWarnings("deprecation")
SerDe serde = new ParquetHiveSerDe();
File file = File.createTempFile("presto_test", "ord");
File file = File.createTempFile("presto_test", "parquet");
file.delete();
try {
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns);
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns, NUM_ROWS);
HiveRecordCursorProvider cursorProvider = new ParquetRecordCursorProvider(false);
testCursorProvider(cursorProvider, split, inputFormat, serde, testColumns);
testCursorProvider(cursorProvider, split, inputFormat, serde, testColumns, NUM_ROWS);
}
finally {
//noinspection ResultOfMethodCallIgnored
file.delete();
}
}

@Test
public void testParquetThrift()
throws Exception
{
RowType nameType = new RowType(ImmutableList.of(VARCHAR, VARCHAR), Optional.empty());
RowType phoneType = new RowType(ImmutableList.of(VARCHAR, VARCHAR), Optional.empty());
RowType personType = new RowType(ImmutableList.of(nameType, BIGINT, VARCHAR, new ArrayType(phoneType)), Optional.empty());

List<TestColumn> testColumns = ImmutableList.<TestColumn>of(
new TestColumn(
"persons",
getStandardListObjectInspector(
getStandardStructObjectInspector(
ImmutableList.of("name", "id", "email", "phones"),
ImmutableList.<ObjectInspector>of(
getStandardStructObjectInspector(
ImmutableList.of("first_name", "last_name"),
ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector)
),
javaIntObjectInspector,
javaStringObjectInspector,
getStandardListObjectInspector(
getStandardStructObjectInspector(
ImmutableList.of("number", "type"),
ImmutableList.of(javaStringObjectInspector, javaStringObjectInspector)
)
)
)
)
),
null,
arraySliceOf(personType,
rowSliceOf(ImmutableList.of(nameType, BIGINT, VARCHAR, new ArrayType(phoneType)),
rowSliceOf(ImmutableList.of(VARCHAR, VARCHAR), "Bob", "Roberts"),
0,
"[email protected]",
arraySliceOf(phoneType, rowSliceOf(ImmutableList.of(VARCHAR, VARCHAR), "1234567890", null))
)
)
)
);

InputFormat<?, ?> inputFormat = new MapredParquetInputFormat();
@SuppressWarnings("deprecation")
SerDe serde = new ParquetHiveSerDe();
File file = new File(this.getClass().getClassLoader().getResource("addressbook.parquet").getPath());
FileSplit split = new FileSplit(new Path(file.getAbsolutePath()), 0, file.length(), new String[0]);
HiveRecordCursorProvider cursorProvider = new ParquetRecordCursorProvider(false);
testCursorProvider(cursorProvider, split, inputFormat, serde, testColumns, 1);
}

@Test
public void testDwrf()
throws Exception
Expand All @@ -272,8 +335,8 @@ public boolean apply(TestColumn testColumn)
File file = File.createTempFile("presto_test", "dwrf");
file.delete();
try {
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns);
testCursorProvider(new DwrfRecordCursorProvider(), split, inputFormat, serde, testColumns);
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns, NUM_ROWS);
testCursorProvider(new DwrfRecordCursorProvider(), split, inputFormat, serde, testColumns, NUM_ROWS);
}
finally {
//noinspection ResultOfMethodCallIgnored
Expand Down Expand Up @@ -303,7 +366,7 @@ public boolean apply(TestColumn testColumn)
File file = File.createTempFile("presto_test", "dwrf");
file.delete();
try {
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns);
FileSplit split = createTestFile(file.getAbsolutePath(), outputFormat, serde, null, testColumns, NUM_ROWS);
testPageSourceFactory(new DwrfPageSourceFactory(TYPE_MANAGER), split, inputFormat, serde, testColumns);
}
finally {
Expand All @@ -316,7 +379,8 @@ private void testCursorProvider(HiveRecordCursorProvider cursorProvider,
FileSplit split,
InputFormat<?, ?> inputFormat,
@SuppressWarnings("deprecation") SerDe serde,
List<TestColumn> testColumns)
List<TestColumn> testColumns,
int numRows)
throws IOException
{
Properties splitProperties = new Properties();
Expand Down Expand Up @@ -344,7 +408,7 @@ private void testCursorProvider(HiveRecordCursorProvider cursorProvider,
DateTimeZone.getDefault(),
TYPE_MANAGER).get();

checkCursor(cursor, testColumns);
checkCursor(cursor, testColumns, numRows);
}

private void testPageSourceFactory(HivePageSourceFactory sourceFactory, FileSplit split, InputFormat<?, ?> inputFormat, SerDe serde, List<TestColumn> testColumns)
Expand Down
Binary file not shown.

0 comments on commit 5994e35

Please sign in to comment.