Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions parquet-avro/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Apache Avro integration
| `parquet.avro.read.schema` | `String` | The Avro schema to be used for reading. It shall be compatible with the file schema. The file schema will be used directly if not set. |
| `parquet.avro.projection` | `String` | The Avro schema to be used for projection. |
| `parquet.avro.compatible` | `boolean` | Flag for compatibility mode. `true` for materializing Avro `IndexedRecord` objects, `false` for materializing the related objects for either generic, specific, or reflect records.<br/>The default value is `true`. |
| `parquet.avro.readInt96AsFixed` | `boolean` | Flag for handling the `INT96` Parquet types. `true` for converting it to the `fixed` Avro type, `false` for not handling `INT96` types (throwing exception).<br/>The default value is `false`.<br/>**NOTE: The `INT96` Parquet type is deprecated. This option is only to support old data.** |

### Configuration for writing

Expand All @@ -42,3 +43,4 @@ Apache Avro integration
| `parquet.avro.write-old-list-structure` | `boolean` | Flag whether to write list structures in the old way (2 levels) or the new one (3 levels). When writing at 2 levels no null values are available at the element level.<br/>The default value is `true` |
| `parquet.avro.add-list-element-records` | `boolean` | Flag whether to assume that any repeated element in the schema is a list element.<br/>The default value is `true`. |
| `parquet.avro.write-parquet-uuid` | `boolean` | Flag whether to write the [Parquet UUID logical type](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#uuid) in case of an [Avro UUID type](https://avro.apache.org/docs/current/spec.html#UUID) is present.<br/>The default value is `false`. |
| `parquet.avro.writeFixedAsInt96` | `String` | Comma separated list of paths pointing to Avro schema elements which are to be converted to `INT96` Parquet types.<br/>The path is a `'.'` separated list of field names and does not contain the name of the schema nor the namespace. The type of the referenced schema elements must be `fixed` with the size of 12 bytes.<br/>**NOTE: The `INT96` Parquet type is deprecated. This option is only to support old data.** |
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,20 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static java.util.Optional.empty;
import static java.util.Optional.of;
import static org.apache.avro.JsonProperties.NULL_VALUE;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED_DEFAULT;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_PARQUET_UUID;
Expand Down Expand Up @@ -77,6 +81,7 @@ public class AvroSchemaConverter {
private final boolean writeOldListStructure;
private final boolean writeParquetUUID;
private final boolean readInt96AsFixed;
private final Set<String> pathsToInt96;

public AvroSchemaConverter() {
this(ADD_LIST_ELEMENT_RECORDS_DEFAULT);
Expand All @@ -93,6 +98,7 @@ public AvroSchemaConverter() {
this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT;
this.readInt96AsFixed = READ_INT96_AS_FIXED_DEFAULT;
this.pathsToInt96 = Collections.emptySet();
}

public AvroSchemaConverter(Configuration conf) {
Expand All @@ -102,6 +108,7 @@ public AvroSchemaConverter(Configuration conf) {
WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT);
this.writeParquetUUID = conf.getBoolean(WRITE_PARQUET_UUID, WRITE_PARQUET_UUID_DEFAULT);
this.readInt96AsFixed = conf.getBoolean(READ_INT96_AS_FIXED, READ_INT96_AS_FIXED_DEFAULT);
this.pathsToInt96 = new HashSet<>(Arrays.asList(conf.getStrings(WRITE_FIXED_AS_INT96, new String[0])));
}

/**
Expand Down Expand Up @@ -134,26 +141,26 @@ public MessageType convert(Schema avroSchema) {
if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
throw new IllegalArgumentException("Avro schema must be a record.");
}
return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields()));
return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields(), ""));
}

private List<Type> convertFields(List<Schema.Field> fields) {
private List<Type> convertFields(List<Schema.Field> fields, String schemaPath) {
List<Type> types = new ArrayList<Type>();
for (Schema.Field field : fields) {
if (field.schema().getType().equals(Schema.Type.NULL)) {
continue; // Avro nulls are not encoded, unless they are null unions
}
types.add(convertField(field));
types.add(convertField(field, appendPath(schemaPath, field.name())));
}
return types;
}

private Type convertField(String fieldName, Schema schema) {
return convertField(fieldName, schema, Type.Repetition.REQUIRED);
private Type convertField(String fieldName, Schema schema, String schemaPath) {
return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath);
}

@SuppressWarnings("deprecation")
private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) {
private Type convertField(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) {
Types.PrimitiveBuilder<PrimitiveType> builder;
Schema.Type type = schema.getType();
LogicalType logicalType = schema.getLogicalType();
Expand All @@ -177,26 +184,33 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet
builder = Types.primitive(BINARY, repetition).as(stringType());
}
} else if (type.equals(Schema.Type.RECORD)) {
return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
return new GroupType(repetition, fieldName, convertFields(schema.getFields(), schemaPath));
} else if (type.equals(Schema.Type.ENUM)) {
builder = Types.primitive(BINARY, repetition).as(enumType());
} else if (type.equals(Schema.Type.ARRAY)) {
if (writeOldListStructure) {
return ConversionPatterns.listType(repetition, fieldName,
convertField("array", schema.getElementType(), REPEATED));
convertField("array", schema.getElementType(), REPEATED, schemaPath));
} else {
return ConversionPatterns.listOfElements(repetition, fieldName,
convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType()));
convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType(), schemaPath));
}
} else if (type.equals(Schema.Type.MAP)) {
Type valType = convertField("value", schema.getValueType());
Type valType = convertField("value", schema.getValueType(), schemaPath);
// avro map key type is always string
return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType);
} else if (type.equals(Schema.Type.FIXED)) {
builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.length(schema.getFixedSize());
if (pathsToInt96.contains(schemaPath)) {
if (schema.getFixedSize() != 12) {
throw new IllegalArgumentException(
"The size of the fixed type field " + schemaPath + " must be 12 bytes for INT96 conversion");
}
builder = Types.primitive(PrimitiveTypeName.INT96, repetition);
} else {
builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(schema.getFixedSize());
}
} else if (type.equals(Schema.Type.UNION)) {
return convertUnion(fieldName, schema, repetition);
return convertUnion(fieldName, schema, repetition, schemaPath);
} else {
throw new UnsupportedOperationException("Cannot convert Avro type " + type);
}
Expand All @@ -218,7 +232,7 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet
return builder.named(fieldName);
}

private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) {
private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) {
List<Schema> nonNullSchemas = new ArrayList<Schema>(schema.getTypes().size());
// Found any schemas in the union? Required for the edge case, where the union contains only a single type.
boolean foundNullSchema = false;
Expand All @@ -239,25 +253,26 @@ private Type convertUnion(String fieldName, Schema schema, Type.Repetition repet
throw new UnsupportedOperationException("Cannot convert Avro union of only nulls");

case 1:
return foundNullSchema ? convertField(fieldName, nonNullSchemas.get(0), repetition) :
convertUnionToGroupType(fieldName, repetition, nonNullSchemas);
return foundNullSchema ? convertField(fieldName, nonNullSchemas.get(0), repetition, schemaPath) :
convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath);

default: // complex union type
return convertUnionToGroupType(fieldName, repetition, nonNullSchemas);
return convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath);
}
}

private Type convertUnionToGroupType(String fieldName, Type.Repetition repetition, List<Schema> nonNullSchemas) {
private Type convertUnionToGroupType(String fieldName, Type.Repetition repetition, List<Schema> nonNullSchemas,
String schemaPath) {
List<Type> unionTypes = new ArrayList<Type>(nonNullSchemas.size());
int index = 0;
for (Schema childSchema : nonNullSchemas) {
unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL));
unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL, schemaPath));
}
return new GroupType(repetition, fieldName, unionTypes);
}

private Type convertField(Schema.Field field) {
return convertField(field.name(), field.schema());
private Type convertField(Schema.Field field, String schemaPath) {
return convertField(field.name(), field.schema(), schemaPath);
}

public Schema convert(MessageType parquetSchema) {
Expand Down Expand Up @@ -314,7 +329,7 @@ public Schema convertINT96(PrimitiveTypeName primitiveTypeName) {
return Schema.createFixed("INT96", "INT96 represented as byte[12]", null, 12);
}
throw new IllegalArgumentException(
"INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.");
"INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.");
}
@Override
public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) {
Expand Down Expand Up @@ -524,4 +539,11 @@ private static Schema optional(Schema original) {
Schema.create(Schema.Type.NULL),
original));
}

private static String appendPath(String path, String fieldName) {
if (path == null || path.isEmpty()) {
return fieldName;
}
return path + '.' + fieldName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ public static void setAvroDataSupplier(
public static final String WRITE_PARQUET_UUID = "parquet.avro.write-parquet-uuid";
static final boolean WRITE_PARQUET_UUID_DEFAULT = false;

// Support writing Parquet INT96 from a 12-byte Avro fixed.
public static final String WRITE_FIXED_AS_INT96 = "parquet.avro.writeFixedAsInt96";

private static final String MAP_REPEATED_NAME = "key_value";
private static final String MAP_KEY_NAME = "key";
private static final String MAP_VALUE_NAME = "value";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.apache.parquet.avro.AvroTestUtil.optionalField;
import static org.apache.parquet.avro.AvroTestUtil.primitive;
import static org.apache.parquet.avro.AvroTestUtil.record;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.schema.OriginalType.DATE;
import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS;
import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS;
Expand Down Expand Up @@ -824,6 +825,37 @@ public void testUUIDTypeWithParquetUUID() throws Exception {
"}\n");
}

@Test
public void testAvroFixed12AsParquetInt96Type() throws Exception {
Schema schema = new Schema.Parser().parse(
Resources.getResource("fixedToInt96.avsc").openStream());

Configuration conf = new Configuration();
conf.setStrings(WRITE_FIXED_AS_INT96, "int96", "mynestedrecord.int96inrecord", "mynestedrecord.myarrayofoptional",
"mynestedrecord.mymap");
testAvroToParquetConversion(conf, schema, "message org.apache.parquet.avro.fixedToInt96 {\n"
+ " required int96 int96;\n"
+ " required fixed_len_byte_array(12) notanint96;\n"
+ " required group mynestedrecord {\n"
+ " required int96 int96inrecord;\n"
+ " required group myarrayofoptional (LIST) {\n"
+ " repeated int96 array;\n"
+ " }\n"
+ " required group mymap (MAP) {\n"
+ " repeated group key_value (MAP_KEY_VALUE) {\n"
+ " required binary key (STRING);\n"
+ " required int96 value;\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ " required fixed_len_byte_array(1) onebytefixed;\n"
+ "}");

conf.setStrings(WRITE_FIXED_AS_INT96, "onebytefixed");
assertThrows("Exception should be thrown for fixed types to be converted to INT96 where the size is not 12 bytes",
IllegalArgumentException.class, () -> new AvroSchemaConverter(conf).convert(schema));
}

public static Schema optional(Schema original) {
return Schema.createUnion(Lists.newArrayList(
Schema.create(Schema.Type.NULL),
Expand Down
80 changes: 80 additions & 0 deletions parquet-avro/src/test/resources/fixedToInt96.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
{
"name": "fixedToInt96",
"namespace": "org.apache.parquet.avro",
"type": "record",
"fields": [
{
"name": "int96",
"type": {
"type": "fixed",
"name": "ignored1",
"namespace": "",
"size": 12
}
},
{
"name": "notanint96",
"type": {
"type": "fixed",
"name": "ignored2",
"namespace": "",
"size": 12
}
},
{
"name": "mynestedrecord",
"type": {
"type": "record",
"name": "ignored3",
"namespace": "",
"fields": [
{
"name": "int96inrecord",
"type": {
"type": "fixed",
"name": "ignored4",
"namespace": "",
"size": 12
}
},
{
"name": "myarrayofoptional",
"type": {
"type": "array",
"items": [
"null",
{
"type": "fixed",
"name": "ignored5",
"namespace": "",
"size": 12
}
]
}
},
{
"name": "mymap",
"type": {
"type": "map",
"values": {
"type": "fixed",
"name": "ignored6",
"namespace": "",
"size": 12
}
}
}
]
}
},
{
"name": "onebytefixed",
"type": {
"type": "fixed",
"name": "ignored7",
"namespace": "",
"size": 1
}
}
]
}