Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ public static GroupColumnIO getMapKeyValueColumn(GroupColumnIO groupColumnIO)
return groupColumnIO;
}

/* For backward-compatibility, the type of elements in LIST-annotated structures should always be determined by the following rules:
* 1. If the repeated field is not a group, then its type is the element type and elements are required.
* 2. If the repeated field is a group with multiple fields, then its type is the element type and elements are required.
* 3. If the repeated field is a group with one field and is named either array or uses the LIST-annotated group's name with _tuple appended then the repeated type is the element type and elements are required.
* 4. Otherwise, the repeated field's type is the element type with the repeated field's repetition.
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
*/
public static ColumnIO getArrayElementColumn(ColumnIO columnIO)
{
while (columnIO instanceof GroupColumnIO && !columnIO.getType().isRepetition(REPEATED)) {
Expand All @@ -86,7 +93,9 @@ public static ColumnIO getArrayElementColumn(ColumnIO columnIO)
*/
if (columnIO instanceof GroupColumnIO &&
columnIO.getType().getOriginalType() == null &&
((GroupColumnIO) columnIO).getChildrenCount() == 1) {
((GroupColumnIO) columnIO).getChildrenCount() == 1 &&
!columnIO.getName().equals("array") &&
!columnIO.getName().equals(columnIO.getParent().getName() + "_tuple")) {
return ((GroupColumnIO) columnIO).getChild(0);
}

Expand Down
10 changes: 5 additions & 5 deletions presto-hive/src/main/java/parquet/io/ColumnIOConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,26 @@ public static Optional<Field> constructField(Type type, ColumnIO columnIO)
if (ROW.equals(type.getTypeSignature().getBase())) {
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
List<Type> parameters = type.getTypeParameters();
ImmutableList.Builder<Optional<Field>> fileldsBuilder = ImmutableList.builder();
ImmutableList.Builder<Optional<Field>> fieldsBuilder = ImmutableList.builder();
List<TypeSignatureParameter> fields = type.getTypeSignature().getParameters();
boolean structHasParameters = false;
for (int i = 0; i < fields.size(); i++) {
NamedTypeSignature namedTypeSignature = fields.get(i).getNamedTypeSignature();
String name = namedTypeSignature.getName().get().toLowerCase(Locale.ENGLISH);
Optional<Field> field = constructField(parameters.get(i), groupColumnIO.getChild(name));
structHasParameters |= field.isPresent();
fileldsBuilder.add(field);
fieldsBuilder.add(field);
}
if (structHasParameters) {
return Optional.of(new GroupField(type, repetitionLevel, definitionLevel, required, fileldsBuilder.build()));
return Optional.of(new GroupField(type, repetitionLevel, definitionLevel, required, fieldsBuilder.build()));
}
return Optional.empty();
}
else if (MAP.equals(type.getTypeSignature().getBase())) {
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
MapType mapType = (MapType) type;
GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO);
if (keyValueColumnIO.getChildrenCount() < 2) {
if (keyValueColumnIO.getChildrenCount() != 2) {
return Optional.empty();
}
Optional<Field> keyField = constructField(mapType.getKeyType(), keyValueColumnIO.getChild(0));
Expand All @@ -83,7 +83,7 @@ else if (MAP.equals(type.getTypeSignature().getBase())) {
else if (ARRAY.equals(type.getTypeSignature().getBase())) {
GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
List<Type> types = type.getTypeParameters();
if (groupColumnIO.getChildrenCount() == 0) {
if (groupColumnIO.getChildrenCount() != 1) {
return Optional.empty();
}
Optional<Field> field = constructField(types.get(0), getArrayElementColumn(groupColumnIO.getChild(0)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void testCustomSchemaArrayOfStucts()
Iterable<List<List>> values = createTestArrays(structs);
List<String> structFieldNames = asList("a", "b", "c");
Type structType = RowType.from(asList(field("a", BIGINT), field("b", BOOLEAN), field("c", VARCHAR)));
tester.testRoundTrip(
tester.testSingleLevelArrayRoundTrip(
getStandardListObjectInspector(getStandardStructObjectInspector(structFieldNames, asList(javaLongObjectInspector, javaBooleanObjectInspector, javaStringObjectInspector))),
values, values, new ArrayType(structType), Optional.of(customSchemaArrayOfStucts));
}
Expand Down Expand Up @@ -379,6 +379,44 @@ public void testSingleLevelArrayOfMapOfStruct()
values, values, new ArrayType(mapType(INTEGER, structType)));
}

@Test
public void testSingleLevelArrayOfStructOfSingleElement()
throws Exception
{
Iterable<List> structs = createTestStructs(transform(intsBetween(0, 31_234), Object::toString));
Iterable<List<List>> values = createTestArrays(structs);
List<String> structFieldNames = singletonList("test");
Type structType = RowType.from(singletonList(field("test", VARCHAR)));
tester.testRoundTrip(
getStandardListObjectInspector(getStandardStructObjectInspector(structFieldNames, singletonList(javaStringObjectInspector))),
values, values, new ArrayType(structType));
tester.testSingleLevelArraySchemaRoundTrip(
getStandardListObjectInspector(getStandardStructObjectInspector(structFieldNames, singletonList(javaStringObjectInspector))),
values, values, new ArrayType(structType));
}

@Test
public void testSingleLevelArrayOfStructOfStructOfSingleElement()
throws Exception
{
Iterable<List> structs = createTestStructs(transform(intsBetween(0, 31_234), Object::toString));
Iterable<List> structsOfStructs = createTestStructs(structs);
Iterable<List<List>> values = createTestArrays(structsOfStructs);
List<String> structFieldNames = singletonList("test");
List<String> structsOfStructsFieldNames = singletonList("test");
Type structType = RowType.from(singletonList(field("test", VARCHAR)));
Type structsOfStructsType = RowType.from(singletonList(field("test", structType)));
ObjectInspector structObjectInspector = getStandardStructObjectInspector(structFieldNames, singletonList(javaStringObjectInspector));
tester.testRoundTrip(
getStandardListObjectInspector(
getStandardStructObjectInspector(structsOfStructsFieldNames, singletonList(structObjectInspector))),
values, values, new ArrayType(structsOfStructsType));
tester.testSingleLevelArraySchemaRoundTrip(
getStandardListObjectInspector(
getStandardStructObjectInspector(structsOfStructsFieldNames, singletonList(structObjectInspector))),
values, values, new ArrayType(structsOfStructsType));
}

@Test
public void testArrayOfMapOfArray()
throws Exception
Expand Down Expand Up @@ -1085,7 +1123,7 @@ public void testSchemaWithRequiredOptionalRequired2Fields()
ObjectInspector eInspector = getStandardStructObjectInspector(singletonList("f"), singletonList(fInspector));
tester.testRoundTrip(asList(aInspector, eInspector),
new Iterable<?>[] {aValues, eValues}, new Iterable<?>[] {aValues, eValues},
asList("a", "e"), asList(aType, eType), Optional.of(parquetSchema));
asList("a", "e"), asList(aType, eType), Optional.of(parquetSchema), false);
}

@Test
Expand All @@ -1098,7 +1136,7 @@ public void testOldAvroArray()
" }" +
"} ");
Iterable<List<Integer>> nonNullArrayElements = createTestArrays(intsBetween(0, 31_234));
tester.testRoundTrip(getStandardListObjectInspector(javaIntObjectInspector), nonNullArrayElements, nonNullArrayElements, new ArrayType(INTEGER), Optional.of(parquetMrAvroSchema));
tester.testSingleLevelArrayRoundTrip(getStandardListObjectInspector(javaIntObjectInspector), nonNullArrayElements, nonNullArrayElements, new ArrayType(INTEGER), Optional.of(parquetMrAvroSchema));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,19 @@ public void testSingleLevelArraySchemaRoundTrip(ObjectInspector objectInspector,
{
ArrayList<TypeInfo> typeInfos = TypeInfoUtils.getTypeInfosFromTypeString(objectInspector.getTypeName());
MessageType schema = SingleLevelArraySchemaConverter.convert(TEST_COLUMN, typeInfos);
testRoundTrip(objectInspector, writeValues, readValues, type, Optional.of(schema));
testSingleLevelArrayRoundTrip(objectInspector, writeValues, readValues, type, Optional.of(schema));
if (objectInspector.getTypeName().contains("map<")) {
schema = SingleLevelArrayMapKeyValuesSchemaConverter.convert(TEST_COLUMN, typeInfos);
testRoundTrip(objectInspector, writeValues, readValues, type, Optional.of(schema));
testSingleLevelArrayRoundTrip(objectInspector, writeValues, readValues, type, Optional.of(schema));
}
}

public void testRoundTrip(ObjectInspector objectInspector, Iterable<?> writeValues, Iterable<?> readValues, Type type)
throws Exception
{
// just the values
testRoundTripType(singletonList(objectInspector), new Iterable<?>[] {writeValues}, new Iterable<?>[] {readValues}, TEST_COLUMN, singletonList(type), Optional.empty());
testRoundTripType(singletonList(objectInspector), new Iterable<?>[] {writeValues},
new Iterable<?>[] {readValues}, TEST_COLUMN, singletonList(type), Optional.empty(), false);

// all nulls
assertRoundTrip(singletonList(objectInspector), new Iterable<?>[] {transform(writeValues, constant(null))},
Expand All @@ -173,7 +174,7 @@ public void testRoundTrip(ObjectInspector objectInspector, Iterable<?> writeValu
MessageType schema = MapKeyValuesSchemaConverter.convert(TEST_COLUMN, typeInfos);
// just the values
testRoundTripType(singletonList(objectInspector), new Iterable<?>[] {writeValues}, new Iterable<?>[] {
readValues}, TEST_COLUMN, singletonList(type), Optional.of(schema));
readValues}, TEST_COLUMN, singletonList(type), Optional.of(schema), false);

// all nulls
assertRoundTrip(singletonList(objectInspector), new Iterable<?>[] {transform(writeValues, constant(null))},
Expand All @@ -184,34 +185,40 @@ public void testRoundTrip(ObjectInspector objectInspector, Iterable<?> writeValu
public void testRoundTrip(ObjectInspector objectInspector, Iterable<?> writeValues, Iterable<?> readValues, Type type, Optional<MessageType> parquetSchema)
throws Exception
{
testRoundTrip(singletonList(objectInspector), new Iterable<?>[] {writeValues}, new Iterable<?>[] {readValues}, TEST_COLUMN, singletonList(type), parquetSchema);
testRoundTrip(singletonList(objectInspector), new Iterable<?>[] {writeValues}, new Iterable<?>[] {readValues}, TEST_COLUMN, singletonList(type), parquetSchema, false);
}

public void testRoundTrip(List<ObjectInspector> objectInspectors, Iterable<?>[] writeValues, Iterable<?>[] readValues, List<String> columnNames, List<Type> columnTypes, Optional<MessageType> parquetSchema)
public void testSingleLevelArrayRoundTrip(ObjectInspector objectInspector, Iterable<?> writeValues, Iterable<?> readValues, Type type, Optional<MessageType> parquetSchema)
throws Exception
{
testRoundTrip(singletonList(objectInspector), new Iterable<?>[] {writeValues}, new Iterable<?>[] {readValues}, TEST_COLUMN, singletonList(type), parquetSchema, true);
}

public void testRoundTrip(List<ObjectInspector> objectInspectors, Iterable<?>[] writeValues, Iterable<?>[] readValues, List<String> columnNames, List<Type> columnTypes, Optional<MessageType> parquetSchema, boolean singleLevelArray)
throws Exception
{
// just the values
testRoundTripType(objectInspectors, writeValues, readValues, columnNames, columnTypes, parquetSchema);
testRoundTripType(objectInspectors, writeValues, readValues, columnNames, columnTypes, parquetSchema, singleLevelArray);

// all nulls
assertRoundTrip(objectInspectors, transformToNulls(writeValues), transformToNulls(readValues), columnNames, columnTypes, parquetSchema);
assertRoundTrip(objectInspectors, transformToNulls(writeValues), transformToNulls(readValues), columnNames, columnTypes, parquetSchema, singleLevelArray);
}

private void testRoundTripType(List<ObjectInspector> objectInspectors, Iterable<?>[] writeValues, Iterable<?>[] readValues,
List<String> columnNames, List<Type> columnTypes, Optional<MessageType> parquetSchema)
List<String> columnNames, List<Type> columnTypes, Optional<MessageType> parquetSchema, boolean singleLevelArray)
throws Exception
{
// forward order
assertRoundTrip(objectInspectors, writeValues, readValues, columnNames, columnTypes, parquetSchema);
assertRoundTrip(objectInspectors, writeValues, readValues, columnNames, columnTypes, parquetSchema, singleLevelArray);

// reverse order
assertRoundTrip(objectInspectors, reverse(writeValues), reverse(readValues), columnNames, columnTypes, parquetSchema);
assertRoundTrip(objectInspectors, reverse(writeValues), reverse(readValues), columnNames, columnTypes, parquetSchema, singleLevelArray);

// forward order with nulls
assertRoundTrip(objectInspectors, insertNullEvery(5, writeValues), insertNullEvery(5, readValues), columnNames, columnTypes, parquetSchema);
assertRoundTrip(objectInspectors, insertNullEvery(5, writeValues), insertNullEvery(5, readValues), columnNames, columnTypes, parquetSchema, singleLevelArray);

// reverse order with nulls
assertRoundTrip(objectInspectors, insertNullEvery(5, reverse(writeValues)), insertNullEvery(5, reverse(readValues)), columnNames, columnTypes, parquetSchema);
assertRoundTrip(objectInspectors, insertNullEvery(5, reverse(writeValues)), insertNullEvery(5, reverse(readValues)), columnNames, columnTypes, parquetSchema, singleLevelArray);
}

void assertRoundTrip(List<ObjectInspector> objectInspectors,
Expand All @@ -221,6 +228,18 @@ void assertRoundTrip(List<ObjectInspector> objectInspectors,
List<Type> columnTypes,
Optional<MessageType> parquetSchema)
throws Exception
{
assertRoundTrip(objectInspectors, writeValues, readValues, columnNames, columnTypes, parquetSchema, false);
}

void assertRoundTrip(List<ObjectInspector> objectInspectors,
Iterable<?>[] writeValues,
Iterable<?>[] readValues,
List<String> columnNames,
List<Type> columnTypes,
Optional<MessageType> parquetSchema,
boolean singleLevelArray)
throws Exception
{
for (WriterVersion version : versions) {
for (CompressionCodecName compressionCodecName : compressions) {
Expand All @@ -236,7 +255,8 @@ void assertRoundTrip(List<ObjectInspector> objectInspectors,
createTableProperties(columnNames, objectInspectors),
getStandardStructObjectInspector(columnNames, objectInspectors),
getIterators(writeValues),
parquetSchema);
parquetSchema,
singleLevelArray);
assertFileContents(
tempFile.getFile(),
getIterators(readValues),
Expand Down Expand Up @@ -383,10 +403,11 @@ private static DataSize writeParquetColumn(JobConf jobConf,
Properties tableProperties,
SettableStructObjectInspector objectInspector,
Iterator<?>[] valuesByField,
Optional<MessageType> parquetSchema)
Optional<MessageType> parquetSchema,
boolean singleLevelArray)
throws Exception
{
RecordWriter recordWriter = new TestMapredParquetOutputFormat(parquetSchema)
RecordWriter recordWriter = new TestMapredParquetOutputFormat(parquetSchema, singleLevelArray)
.getHiveRecordWriter(
jobConf,
new Path(outputFile.toURI()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ else if (typeInfo.getCategory().equals(Category.UNION)) {
private static GroupType convertArrayType(final String name, final ListTypeInfo typeInfo, final Repetition repetition)
{
final TypeInfo subType = typeInfo.getListElementTypeInfo();
return listWrapper(name, OriginalType.LIST, convertType("array_element", subType, Repetition.REPEATED), repetition);
return listWrapper(name, OriginalType.LIST, convertType("array", subType, Repetition.REPEATED), repetition);
}

// An optional group containing multiple elements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ class TestDataWritableWriteSupport
{
private TestDataWritableWriter writer;
private MessageType schema;
private boolean singleLevelArray;

public TestDataWritableWriteSupport(boolean singleLevelArray)
{
this.singleLevelArray = singleLevelArray;
}

@Override
public WriteContext init(final Configuration configuration)
Expand All @@ -42,7 +48,7 @@ public WriteContext init(final Configuration configuration)
@Override
public void prepareForWrite(final RecordConsumer recordConsumer)
{
writer = new TestDataWritableWriter(recordConsumer, schema);
writer = new TestDataWritableWriter(recordConsumer, schema, singleLevelArray);
}

@Override
Expand Down
Loading