-
Notifications
You must be signed in to change notification settings - Fork 36
Support reading Avro complex union types #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| return false; | ||
| } | ||
|
|
||
| public static boolean isNonOptionalUnionOptional(Schema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isNonOptionalUnionOptional is confusing. may be isNonOptionalUnion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isNonOptionalUnionOptional is checking whether non option union is optional. It does more than isNonOptionalUnion . For example, [“int”, “string”] is non option union but it is required. [“null”, “int”, “string”] is non option union and it is optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get the optional vs required part, but what is the non-option union part?
can we rename to isUnionOptional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option union only refers to [null, type]. While [int, string], [int], [null, int, string] etc are all non-option unions. What this method checks is whether a non-option union is optional. I think the original name is more accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Option is an Iceberg (bad) term == Optional with 2 types only.
Would reversing logic and renaming to isNonOptionUniontRequired make more sense? I will leave it to your preference. But if you keep it as isNonOptionUnionOptional, please clarify in a java-doc the difference between optional and option/non-option
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should rename this method. How about isOptionalComplexUnion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, since the input is an Avro schema, should not we use Avro terminology? Also something along the lines of isNullableComplexUnion should work probably.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. Renamed it to isOptionalComplexUnion. Also added java doc to explain the semantics to make ti clearer.
|
|
||
| default: | ||
| return visitor.primitive(iType != null ? iType.asPrimitiveType() : null, schema); | ||
| if (iType == null || iType.isStructType()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't iType.structType() map to case RECORD: in line 37?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused by adding calling isStructType here, could you clarify the logic here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When visitUnion, each element of UNION will be visited along with iType as StructType. Thus, we need to handle iType properly. Discussed offline, we decide to move the StructType handling logic to visitUnion so that no change need to be made here.
| default: | ||
| return visitor.primitive(iType != null ? iType.asPrimitiveType() : null, schema); | ||
| if (iType == null || iType.isStructType()) { | ||
| return visitor.primitive(null, schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to pass null as opposed to iType.asStructType() in case of iType.isStructType() ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When visitUnion, each element of UNION will be visited along with iType as StructType. Thus, we need to handle iType properly. Discussed offline, we decide to move the StructType handling logic to visitUnion so that no change need to be made here.
| } | ||
| } | ||
|
|
||
| return Types.StructType.of(newFields); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure I follow the reasoning why struct is the appropriate way to represent non-optional union. Should we introduce a new Types.unionType (that extends struct may be)? Because using struct per se (with optional fields) is somewhat confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline with @shenodaguirguis . The reasoning is documented in design doc. The non-optional union can be represented with struct type as the following example
[ “null”, “int”, “string” ] => optional struct ( 1 tag_0: optional int, 2 tag_1: optional string)
| import org.junit.Test; | ||
|
|
||
|
|
||
| public class TestAvroNonOptionalUnion { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add tests for invalid cases as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add more tests including non-option union.
| Object value = this.readers[index].read(decoder, reuse); | ||
|
|
||
| struct.update(index, value); | ||
| for (int i = 0; i < readers.length && i != index; i += 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we do this for all i before we set the value at index? This loop is setting null all fields before the value, but not after...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this. Change to explicitly set all fields to null before setting value at index.
rzhang10
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to add a general comment here, so I see this is to support reading unions in avro format data in spark, are we also going to support reading union in orc format in spark? If that's the case, I feel the scope will expand since we are merging hive and avro schema literal as the iceberg schema for orc format, and if the hive schema has a union it will throw and exception as unsupported.
| } | ||
|
|
||
| public static boolean isNonOptionalUnionOptional(Schema schema) { | ||
| if (schema.getType() == UNION && schema.getTypes().size() != 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could there be a corner case where the schema is a union that only contains null, like this:
"type": [
"null"
]
If so, it seems this will make the related code in SchemaToType.java return an empty struct when converting an union.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch @rzhang10 ! While it is meaningless to have a single-type in a union, the avro Schema.UnionSchema constructor does not perform any checks on the number of types, so we definitely need to guard against such cases (throw if size < 2 ?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it can happen. For example, [null] and [int] will pass the check. However, those are valid Avro unions and Avro does not prevent users from creating those schemas. In addition, our reader will works with those single unions. Maybe it is better to support those than guard it? We can let the user to decide whether it is useful or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should those be converted to structs or simple types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to convert it to struct for consistency as it is valid union in avro.
| } else { | ||
| return options.get(0); | ||
| } | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this else branch, if it's the corner case (one null union) I mentioned above, it will construct an empty struct, which will prob cause an exception later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it will create an empty struct. However, it is a valid Avro schema and our reader can read it without issue. Should we support it instead of guard against it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for validating this Wenye. I am fine with not guarding against it, given that avro schema also does not guard against this corner case.
|
|
||
| default: | ||
| return visitor.primitive(iType != null ? iType.asPrimitiveType() : null, schema); | ||
| if (iType == null || iType.isStructType()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused by adding calling isStructType here, could you clarify the logic here?
0536543 to
790874c
Compare
shenodaguirguis
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes @funcheetah !
LGTM in general. Other than the nits, the only - small - open issue I see is to guard against the wrong corner case of union type with < 2 types which @rzhang10 pointed out
| return false; | ||
| } | ||
|
|
||
| public static boolean isNonOptionalUnionOptional(Schema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get the optional vs required part, but what is the non-option union part?
can we rename to isUnionOptional?
| } | ||
|
|
||
| public static boolean isNonOptionalUnionOptional(Schema schema) { | ||
| if (schema.getType() == UNION && schema.getTypes().size() != 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch @rzhang10 ! While it is meaningless to have a single-type in a union, the avro Schema.UnionSchema constructor does not perform any checks on the number of types, so we definitely need to guard against such cases (throw if size < 2 ?).
| int index = 0; | ||
| for (Schema branch : types) { | ||
| if (branch.getType() == Schema.Type.NULL) { | ||
| options.add(visit((Type) null, branch, visitor)); | ||
| } else { | ||
| options.add(visit(type, branch, visitor)); | ||
| if (AvroSchemaUtil.isOptionSchema(union)) { | ||
| options.add(visit(type, branch, visitor)); | ||
| } else { | ||
| options.add(visit(type.asStructType().fields().get(index).type(), branch, visitor)); | ||
| } | ||
| index++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we now maintain index outside the loop, and check if union is option (line 88) every time in the loop. May be use one loop for both branches and fields, and re-org to hoist up the union check (if that is feasible)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about re-assigning the type before the loop?
if (AvroSchemaUtil.isOptionSchema(union))
branches = type;
else
branches = type.asStructType();
| } else { | ||
| return options.get(0); | ||
| } | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
| } | ||
|
|
||
| public static boolean isNonOptionalUnionOptional(Schema schema) { | ||
| if (schema.getType() == UNION && schema.getTypes().size() != 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should those be converted to structs or simple types?
| return false; | ||
| } | ||
|
|
||
| public static boolean isNonOptionalUnionOptional(Schema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should rename this method. How about isOptionalComplexUnion?
| return false; | ||
| } | ||
|
|
||
| public static boolean isNonOptionalUnionOptional(Schema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, since the input is an Avro schema, should not we use Avro terminology? Also something along the lines of isNullableComplexUnion should work probably.
| int index = 0; | ||
| for (Schema branch : types) { | ||
| if (branch.getType() == Schema.Type.NULL) { | ||
| options.add(visit((Type) null, branch, visitor)); | ||
| } else { | ||
| options.add(visit(type, branch, visitor)); | ||
| if (AvroSchemaUtil.isOptionSchema(union)) { | ||
| options.add(visit(type, branch, visitor)); | ||
| } else { | ||
| options.add(visit(type.asStructType().fields().get(index).type(), branch, visitor)); | ||
| } | ||
| index++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about re-assigning the type before the loop?
if (AvroSchemaUtil.isOptionSchema(union))
branches = type;
else
branches = type.asStructType();
|
|
||
| return null; | ||
| } else { | ||
| // case non option union |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Complex union case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Modified.
| } | ||
|
|
||
| @Test | ||
| public void testNonOptionUnionNullable() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testOptionalComplexUnion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| } | ||
|
|
||
| @Test | ||
| public void testSingleOptionUnion() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testSingleComponentUnion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| public TemporaryFolder temp = new TemporaryFolder(); | ||
|
|
||
| @Test | ||
| public void writeAndValidateNonOptionUnionNonNullable() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RequiredComplexUnion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| } | ||
|
|
||
| @Test | ||
| public void writeAndValidateNonOptionUnionNullable() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OptionalComplexUnion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| } | ||
|
|
||
| @Test | ||
| public void writeAndValidateSingleOptionUnion() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SingleComponent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
This PR supports reading Avro complex union types as Iceberg Struct type via
SparkAvroReader.For example,
[ "int", "string" ] => required struct ( 0 tag_0: optional int, 1 tag_1: optional string)[ "null", "int", "string" ] => optional struct ( 0 tag_0: optional int, 1 tag_1: optional string)Testing done: