diff --git a/parquet-avro/README.md b/parquet-avro/README.md
index 8b1cca2550..4f67491584 100644
--- a/parquet-avro/README.md
+++ b/parquet-avro/README.md
@@ -32,6 +32,7 @@ Apache Avro integration
| `parquet.avro.read.schema` | `String` | The Avro schema to be used for reading. It shall be compatible with the file schema. The file schema will be used directly if not set. |
| `parquet.avro.projection` | `String` | The Avro schema to be used for projection. |
| `parquet.avro.compatible` | `boolean` | Flag for compatibility mode. `true` for materializing Avro `IndexedRecord` objects, `false` for materializing the related objects for either generic, specific, or reflect records.
The default value is `true`. |
+| `parquet.avro.readInt96AsFixed` | `boolean` | Flag for handling the `INT96` Parquet types. `true` for converting it to the `fixed` Avro type, `false` for not handling `INT96` types (throwing exception).
The default value is `false`.
**NOTE: The `INT96` Parquet type is deprecated. This option is only to support old data.** |
### Configuration for writing
@@ -42,3 +43,4 @@ Apache Avro integration
| `parquet.avro.write-old-list-structure` | `boolean` | Flag whether to write list structures in the old way (2 levels) or the new one (3 levels). When writing at 2 levels no null values are available at the element level.
The default value is `true` |
| `parquet.avro.add-list-element-records` | `boolean` | Flag whether to assume that any repeated element in the schema is a list element.
The default value is `true`. |
| `parquet.avro.write-parquet-uuid` | `boolean` | Flag whether to write the [Parquet UUID logical type](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#uuid) in case of an [Avro UUID type](https://avro.apache.org/docs/current/spec.html#UUID) is present.
The default value is `false`. |
+| `parquet.avro.writeFixedAsInt96` | `String` | Comma separated list of paths pointing to Avro schema elements which are to be converted to `INT96` Parquet types.
The path is a `'.'` separated list of field names and does not contain the name of the schema nor the namespace. The type of the referenced schema elements must be `fixed` with the size of 12 bytes.
**NOTE: The `INT96` Parquet type is deprecated. This option is only to support old data.** |
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 4c06e9c9b2..7d1f3cab9f 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -35,16 +35,20 @@
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import static java.util.Optional.empty;
import static java.util.Optional.of;
import static org.apache.avro.JsonProperties.NULL_VALUE;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED_DEFAULT;
+import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_PARQUET_UUID;
@@ -77,6 +81,7 @@ public class AvroSchemaConverter {
private final boolean writeOldListStructure;
private final boolean writeParquetUUID;
private final boolean readInt96AsFixed;
+ private final Set pathsToInt96;
public AvroSchemaConverter() {
this(ADD_LIST_ELEMENT_RECORDS_DEFAULT);
@@ -93,6 +98,7 @@ public AvroSchemaConverter() {
this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT;
this.readInt96AsFixed = READ_INT96_AS_FIXED_DEFAULT;
+ this.pathsToInt96 = Collections.emptySet();
}
public AvroSchemaConverter(Configuration conf) {
@@ -102,6 +108,7 @@ public AvroSchemaConverter(Configuration conf) {
WRITE_OLD_LIST_STRUCTURE, WRITE_OLD_LIST_STRUCTURE_DEFAULT);
this.writeParquetUUID = conf.getBoolean(WRITE_PARQUET_UUID, WRITE_PARQUET_UUID_DEFAULT);
this.readInt96AsFixed = conf.getBoolean(READ_INT96_AS_FIXED, READ_INT96_AS_FIXED_DEFAULT);
+ this.pathsToInt96 = new HashSet<>(Arrays.asList(conf.getStrings(WRITE_FIXED_AS_INT96, new String[0])));
}
/**
@@ -134,26 +141,26 @@ public MessageType convert(Schema avroSchema) {
if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
throw new IllegalArgumentException("Avro schema must be a record.");
}
- return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields()));
+ return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields(), ""));
}
- private List convertFields(List fields) {
+ private List convertFields(List fields, String schemaPath) {
List types = new ArrayList();
for (Schema.Field field : fields) {
if (field.schema().getType().equals(Schema.Type.NULL)) {
continue; // Avro nulls are not encoded, unless they are null unions
}
- types.add(convertField(field));
+ types.add(convertField(field, appendPath(schemaPath, field.name())));
}
return types;
}
- private Type convertField(String fieldName, Schema schema) {
- return convertField(fieldName, schema, Type.Repetition.REQUIRED);
+ private Type convertField(String fieldName, Schema schema, String schemaPath) {
+ return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath);
}
@SuppressWarnings("deprecation")
- private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) {
+ private Type convertField(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) {
Types.PrimitiveBuilder builder;
Schema.Type type = schema.getType();
LogicalType logicalType = schema.getLogicalType();
@@ -177,26 +184,33 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet
builder = Types.primitive(BINARY, repetition).as(stringType());
}
} else if (type.equals(Schema.Type.RECORD)) {
- return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
+ return new GroupType(repetition, fieldName, convertFields(schema.getFields(), schemaPath));
} else if (type.equals(Schema.Type.ENUM)) {
builder = Types.primitive(BINARY, repetition).as(enumType());
} else if (type.equals(Schema.Type.ARRAY)) {
if (writeOldListStructure) {
return ConversionPatterns.listType(repetition, fieldName,
- convertField("array", schema.getElementType(), REPEATED));
+ convertField("array", schema.getElementType(), REPEATED, schemaPath));
} else {
return ConversionPatterns.listOfElements(repetition, fieldName,
- convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType()));
+ convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType(), schemaPath));
}
} else if (type.equals(Schema.Type.MAP)) {
- Type valType = convertField("value", schema.getValueType());
+ Type valType = convertField("value", schema.getValueType(), schemaPath);
// avro map key type is always string
return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType);
} else if (type.equals(Schema.Type.FIXED)) {
- builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
- .length(schema.getFixedSize());
+ if (pathsToInt96.contains(schemaPath)) {
+ if (schema.getFixedSize() != 12) {
+ throw new IllegalArgumentException(
+ "The size of the fixed type field " + schemaPath + " must be 12 bytes for INT96 conversion");
+ }
+ builder = Types.primitive(PrimitiveTypeName.INT96, repetition);
+ } else {
+ builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(schema.getFixedSize());
+ }
} else if (type.equals(Schema.Type.UNION)) {
- return convertUnion(fieldName, schema, repetition);
+ return convertUnion(fieldName, schema, repetition, schemaPath);
} else {
throw new UnsupportedOperationException("Cannot convert Avro type " + type);
}
@@ -218,7 +232,7 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet
return builder.named(fieldName);
}
- private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) {
+ private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) {
List nonNullSchemas = new ArrayList(schema.getTypes().size());
// Found any schemas in the union? Required for the edge case, where the union contains only a single type.
boolean foundNullSchema = false;
@@ -239,25 +253,26 @@ private Type convertUnion(String fieldName, Schema schema, Type.Repetition repet
throw new UnsupportedOperationException("Cannot convert Avro union of only nulls");
case 1:
- return foundNullSchema ? convertField(fieldName, nonNullSchemas.get(0), repetition) :
- convertUnionToGroupType(fieldName, repetition, nonNullSchemas);
+ return foundNullSchema ? convertField(fieldName, nonNullSchemas.get(0), repetition, schemaPath) :
+ convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath);
default: // complex union type
- return convertUnionToGroupType(fieldName, repetition, nonNullSchemas);
+ return convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath);
}
}
- private Type convertUnionToGroupType(String fieldName, Type.Repetition repetition, List nonNullSchemas) {
+ private Type convertUnionToGroupType(String fieldName, Type.Repetition repetition, List nonNullSchemas,
+ String schemaPath) {
List unionTypes = new ArrayList(nonNullSchemas.size());
int index = 0;
for (Schema childSchema : nonNullSchemas) {
- unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL));
+ unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL, schemaPath));
}
return new GroupType(repetition, fieldName, unionTypes);
}
- private Type convertField(Schema.Field field) {
- return convertField(field.name(), field.schema());
+ private Type convertField(Schema.Field field, String schemaPath) {
+ return convertField(field.name(), field.schema(), schemaPath);
}
public Schema convert(MessageType parquetSchema) {
@@ -314,7 +329,7 @@ public Schema convertINT96(PrimitiveTypeName primitiveTypeName) {
return Schema.createFixed("INT96", "INT96 represented as byte[12]", null, 12);
}
throw new IllegalArgumentException(
- "INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.");
+ "INT96 is deprecated. As interim enable READ_INT96_AS_FIXED flag to read as byte array.");
}
@Override
public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) {
@@ -524,4 +539,11 @@ private static Schema optional(Schema original) {
Schema.create(Schema.Type.NULL),
original));
}
+
+ private static String appendPath(String path, String fieldName) {
+ if (path == null || path.isEmpty()) {
+ return fieldName;
+ }
+ return path + '.' + fieldName;
+ }
}
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index 440658773b..82a80d31d3 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -67,6 +67,9 @@ public static void setAvroDataSupplier(
public static final String WRITE_PARQUET_UUID = "parquet.avro.write-parquet-uuid";
static final boolean WRITE_PARQUET_UUID_DEFAULT = false;
+ // Support writing Parquet INT96 from a 12-byte Avro fixed.
+ public static final String WRITE_FIXED_AS_INT96 = "parquet.avro.writeFixedAsInt96";
+
private static final String MAP_REPEATED_NAME = "key_value";
private static final String MAP_KEY_NAME = "key";
private static final String MAP_VALUE_NAME = "value";
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
index 065a63694a..1bafdec1e3 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java
@@ -44,6 +44,7 @@
import static org.apache.parquet.avro.AvroTestUtil.optionalField;
import static org.apache.parquet.avro.AvroTestUtil.primitive;
import static org.apache.parquet.avro.AvroTestUtil.record;
+import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
import static org.apache.parquet.schema.OriginalType.DATE;
import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MICROS;
import static org.apache.parquet.schema.OriginalType.TIMESTAMP_MILLIS;
@@ -824,6 +825,37 @@ public void testUUIDTypeWithParquetUUID() throws Exception {
"}\n");
}
+ @Test
+ public void testAvroFixed12AsParquetInt96Type() throws Exception {
+ Schema schema = new Schema.Parser().parse(
+ Resources.getResource("fixedToInt96.avsc").openStream());
+
+ Configuration conf = new Configuration();
+ conf.setStrings(WRITE_FIXED_AS_INT96, "int96", "mynestedrecord.int96inrecord", "mynestedrecord.myarrayofoptional",
+ "mynestedrecord.mymap");
+ testAvroToParquetConversion(conf, schema, "message org.apache.parquet.avro.fixedToInt96 {\n"
+ + " required int96 int96;\n"
+ + " required fixed_len_byte_array(12) notanint96;\n"
+ + " required group mynestedrecord {\n"
+ + " required int96 int96inrecord;\n"
+ + " required group myarrayofoptional (LIST) {\n"
+ + " repeated int96 array;\n"
+ + " }\n"
+ + " required group mymap (MAP) {\n"
+ + " repeated group key_value (MAP_KEY_VALUE) {\n"
+ + " required binary key (STRING);\n"
+ + " required int96 value;\n"
+ + " }\n"
+ + " }\n"
+ + " }\n"
+ + " required fixed_len_byte_array(1) onebytefixed;\n"
+ + "}");
+
+ conf.setStrings(WRITE_FIXED_AS_INT96, "onebytefixed");
+ assertThrows("Exception should be thrown for fixed types to be converted to INT96 where the size is not 12 bytes",
+ IllegalArgumentException.class, () -> new AvroSchemaConverter(conf).convert(schema));
+ }
+
public static Schema optional(Schema original) {
return Schema.createUnion(Lists.newArrayList(
Schema.create(Schema.Type.NULL),
diff --git a/parquet-avro/src/test/resources/fixedToInt96.avsc b/parquet-avro/src/test/resources/fixedToInt96.avsc
new file mode 100644
index 0000000000..97028521c8
--- /dev/null
+++ b/parquet-avro/src/test/resources/fixedToInt96.avsc
@@ -0,0 +1,80 @@
+{
+ "name": "fixedToInt96",
+ "namespace": "org.apache.parquet.avro",
+ "type": "record",
+ "fields": [
+ {
+ "name": "int96",
+ "type": {
+ "type": "fixed",
+ "name": "ignored1",
+ "namespace": "",
+ "size": 12
+ }
+ },
+ {
+ "name": "notanint96",
+ "type": {
+ "type": "fixed",
+ "name": "ignored2",
+ "namespace": "",
+ "size": 12
+ }
+ },
+ {
+ "name": "mynestedrecord",
+ "type": {
+ "type": "record",
+ "name": "ignored3",
+ "namespace": "",
+ "fields": [
+ {
+ "name": "int96inrecord",
+ "type": {
+ "type": "fixed",
+ "name": "ignored4",
+ "namespace": "",
+ "size": 12
+ }
+ },
+ {
+ "name": "myarrayofoptional",
+ "type": {
+ "type": "array",
+ "items": [
+ "null",
+ {
+ "type": "fixed",
+ "name": "ignored5",
+ "namespace": "",
+ "size": 12
+ }
+ ]
+ }
+ },
+ {
+ "name": "mymap",
+ "type": {
+ "type": "map",
+ "values": {
+ "type": "fixed",
+ "name": "ignored6",
+ "namespace": "",
+ "size": 12
+ }
+ }
+ }
+ ]
+ }
+ },
+ {
+ "name": "onebytefixed",
+ "type": {
+ "type": "fixed",
+ "name": "ignored7",
+ "namespace": "",
+ "size": 1
+ }
+ }
+ ]
+}