-
Notifications
You must be signed in to change notification settings - Fork 36
Support reading Avro complex union types #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d45c91c
82f06fe
bbdacef
d3bf9b7
0e06bf6
790874c
a206064
dd07a2a
67150a4
cc7efc9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -79,11 +79,18 @@ private static <T> T visitRecord(Types.StructType struct, Schema record, AvroSch | |
| private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) { | ||
| List<Schema> types = union.getTypes(); | ||
| List<T> options = Lists.newArrayListWithExpectedSize(types.size()); | ||
|
|
||
| int index = 0; | ||
| for (Schema branch : types) { | ||
| if (branch.getType() == Schema.Type.NULL) { | ||
| options.add(visit((Type) null, branch, visitor)); | ||
| } else { | ||
| options.add(visit(type, branch, visitor)); | ||
| if (AvroSchemaUtil.isOptionSchema(union)) { | ||
| options.add(visit(type, branch, visitor)); | ||
| } else { | ||
| options.add(visit(type.asStructType().fields().get(index).type(), branch, visitor)); | ||
| } | ||
| index++; | ||
|
Comment on lines
+83
to
+93
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we now maintain index outside the loop, and check if union is option (line 88) every time in the loop. May be use one loop for both branches and fields, and re-org to hoist up the union check (if that is feasible)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about re-assigning the type before the loop? |
||
| } | ||
| } | ||
| return visitor.union(type, union, options); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -96,7 +96,7 @@ public Type record(Schema record, List<String> names, List<Type> fieldTypes) { | |
| Object defaultValue = field.hasDefaultValue() && !(field.defaultVal() instanceof JsonProperties.Null) ? | ||
| field.defaultVal() : null; | ||
|
|
||
| if (AvroSchemaUtil.isOptionSchema(field.schema())) { | ||
| if (AvroSchemaUtil.isOptionSchema(field.schema()) || AvroSchemaUtil.isOptionalComplexUnion(field.schema())) { | ||
| newFields.add(Types.NestedField.optional(fieldId, field.name(), fieldType, defaultValue, null)); | ||
| } else if (defaultValue != null) { | ||
| newFields.add(Types.NestedField.required(fieldId, field.name(), fieldType, defaultValue, null)); | ||
|
|
@@ -110,13 +110,26 @@ public Type record(Schema record, List<String> names, List<Type> fieldTypes) { | |
|
|
||
| @Override | ||
| public Type union(Schema union, List<Type> options) { | ||
| Preconditions.checkArgument(AvroSchemaUtil.isOptionSchema(union), | ||
| "Unsupported type: non-option union: %s", union); | ||
| // records, arrays, and maps will check nullability later | ||
| if (options.get(0) == null) { | ||
| return options.get(1); | ||
| if (AvroSchemaUtil.isOptionSchema(union)) { | ||
| // Optional simple union | ||
| // records, arrays, and maps will check nullability later | ||
| if (options.get(0) == null) { | ||
| return options.get(1); | ||
| } else { | ||
| return options.get(0); | ||
| } | ||
| } else { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this else branch, if it's the corner case (one null union) I mentioned above, it will construct an empty struct, which will prob cause an exception later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it will create an empty struct. However, it is a valid Avro schema and our reader can read it without issue. Should we support it instead of guard against it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for validating this Wenye. I am fine with not guarding against it, given that avro schema also does not guard against this corner case. |
||
| return options.get(0); | ||
| // Complex union | ||
| List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(options.size()); | ||
|
|
||
| int tagIndex = 0; | ||
| for (Type type : options) { | ||
| if (type != null) { | ||
| newFields.add(Types.NestedField.optional(allocateId(), "tag_" + tagIndex++, type)); | ||
| } | ||
| } | ||
|
|
||
| return Types.StructType.of(newFields); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure I follow the reasoning why struct is the appropriate way to represent non-optional union. Should we introduce a new Types.unionType (that extends struct may be)? Because using struct per se (with optional fields) is somewhat confusing
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed offline with @shenodaguirguis . The reasoning is documented in design doc. The non-optional union can be represented with struct type as the following example |
||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.iceberg.avro; | ||
|
|
||
| import org.apache.avro.Schema; | ||
| import org.apache.avro.SchemaBuilder; | ||
| import org.junit.Assert; | ||
| import org.junit.Test; | ||
|
|
||
|
|
||
| public class TestAvroComplexUnion { | ||
|
|
||
| @Test | ||
| public void testRequiredComplexUnion() { | ||
| Schema avroSchema = SchemaBuilder.record("root") | ||
| .fields() | ||
| .name("unionCol") | ||
| .type() | ||
| .unionOf() | ||
| .intType() | ||
| .and() | ||
| .stringType() | ||
| .endUnion() | ||
| .noDefault() | ||
| .endRecord(); | ||
|
|
||
| org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); | ||
| String expectedIcebergSchema = "table {\n" + | ||
| " 0: unionCol: required struct<1: tag_0: optional int, 2: tag_1: optional string>\n" + "}"; | ||
|
|
||
| Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testOptionalComplexUnion() { | ||
| Schema avroSchema = SchemaBuilder.record("root") | ||
| .fields() | ||
| .name("unionCol") | ||
| .type() | ||
| .unionOf() | ||
| .nullType() | ||
| .and() | ||
| .intType() | ||
| .and() | ||
| .stringType() | ||
| .endUnion() | ||
| .noDefault() | ||
| .endRecord(); | ||
|
|
||
| org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); | ||
| String expectedIcebergSchema = | ||
| "table {\n" + " 0: unionCol: optional struct<1: tag_0: optional int, 2: tag_1: optional string>\n" + "}"; | ||
|
|
||
| Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testSingleComponentUnion() { | ||
| Schema avroSchema = SchemaBuilder.record("root") | ||
| .fields() | ||
| .name("unionCol") | ||
| .type() | ||
| .unionOf() | ||
| .intType() | ||
| .endUnion() | ||
| .noDefault() | ||
| .endRecord(); | ||
|
|
||
| org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); | ||
| String expectedIcebergSchema = "table {\n" + " 0: unionCol: required struct<1: tag_0: optional int>\n" + "}"; | ||
|
|
||
| Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testOptionSchema() { | ||
| Schema avroSchema = SchemaBuilder.record("root") | ||
| .fields() | ||
| .name("optionCol") | ||
| .type() | ||
| .unionOf() | ||
| .nullType() | ||
| .and() | ||
| .intType() | ||
| .endUnion() | ||
| .nullDefault() | ||
| .endRecord(); | ||
|
|
||
| org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); | ||
| String expectedIcebergSchema = "table {\n" + " 0: optionCol: optional int\n" + "}"; | ||
|
|
||
| Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString()); | ||
| } | ||
|
|
||
| @Test | ||
| public void testNullUnionSchema() { | ||
| Schema avroSchema = SchemaBuilder.record("root") | ||
| .fields() | ||
| .name("nullUnionCol") | ||
| .type() | ||
| .unionOf() | ||
| .nullType() | ||
| .endUnion() | ||
| .noDefault() | ||
| .endRecord(); | ||
|
|
||
| org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); | ||
| String expectedIcebergSchema = "table {\n" + " 0: nullUnionCol: optional struct<>\n" + "}"; | ||
|
|
||
| Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could there be a corner case where the schema is a union that only contains
null, like this:If so, it seems this will make the related code in
SchemaToType.javareturn an empty struct when converting an union.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch @rzhang10 ! While it is meaningless to have a single-type in a union, the avro
Schema.UnionSchemaconstructor does not perform any checks on the number of types, so we definitely need to guard against such cases (throw if size < 2 ?).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it can happen. For example,
[null]and[int]will pass the check. However, those are valid Avro unions and Avro does not prevent users from creating those schemas. In addition, our reader will works with those single unions. Maybe it is better to support those than guard it? We can let the user to decide whether it is useful or not.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should those be converted to structs or simple types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to convert it to struct for consistency as it is valid union in avro.