Support non-optional union types for Avro#4242
Support non-optional union types for Avro#4242funcheetah wants to merge 13 commits intoapache:mainfrom
Conversation
shardulm94
left a comment
There was a problem hiding this comment.
Thanks @funcheetah for the work here! Being able to represent unions as a struct really helps seamlessly migrate the Hive tables in our ecosystem to Iceberg without having to restate all historical data.
I think this PR is a good starting step towards the goal. However for ease of reviewing, can we split the PR into two? Avro and ORC. I think we can work on finishing up the Avro side before moving on to ORC. I have made a preliminary pass on the Avro changes below.
There are a couple of TODOs mentioned in the PR description. But I think there may be more things required for completeness and consistency.
- Support in non-Spark environments (e.g. iceberg-data, flink, hive, etc.)
- Support for schema pruning within a complex union
These can be added in gradually, but they should be noted in the PR. And we should create separate issues for these.
@RussellSpitzer @rdblue Should we create a new Project in Github to track this effort? There will be multiple PRs required to complete this work.
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java
Show resolved
Hide resolved
| List<Types.NestedField> newFields = new ArrayList<>(); | ||
| newFields.add(Types.NestedField.required(allocateId(), "tag", Types.IntegerType.get())); | ||
|
|
||
| int tagIndex = 0; | ||
| for (Type type : options) { | ||
| if (type != null) { | ||
| newFields.add(Types.NestedField.optional(allocateId(), "field" + tagIndex++, type)); | ||
| } |
There was a problem hiding this comment.
Can you add the reasoning behind why the field names tag and field${index} were chosen when converting to struct? Spark's Avro datasources uses member${index} while Hive's extract_union UDF uses tag_${index}.
There was a problem hiding this comment.
The schema of struct converted from complex union is chosen to be consistent with Trino representation as implemented in this PR: trinodb/trino#3483
Thanks a lot for the reviewing @shardulm94 ! We can focus on reviewing for Avro in this PR and open another PR for ORC. Regarding tracking of followup PRs, what is the best way for us to do so? Creating a project? |
|
Created PR: #4654 for ORC. We can focus on reviewing Avro in this PR. cc: @wmoustafa @shardulm94 |
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueReaders.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java
Outdated
Show resolved
Hide resolved
|
Can you address the checkstyle failures reported by the build? https://github.com/apache/iceberg/runs/6284396922?check_suite_focus=true |
Thanks. Checkstyle failures are fixed. |
|
Thank you @shardulm94 for the review! @rdblue could you take a look? @shardulm94 has already approved. If you do not have any other concerns, we can go ahead and merge it. |
| options.add(visitWithName("field" + nonNullIdx, type, visitor)); | ||
| nonNullIdx += 1; | ||
| } else { | ||
| options.add(visit(type, visitor)); |
There was a problem hiding this comment.
Why not visit with the field name?
| if (branch.getType() == Schema.Type.NULL) { | ||
| options.add(visit((Type) null, branch, visitor)); | ||
| } else { | ||
| options.add(visit(type.asStructType().fields().get(index).type(), branch, visitor)); |
There was a problem hiding this comment.
Minor: You can move type.asStructType().fields() out of the loop, just after the precondition.
There was a problem hiding this comment.
Thanks for suggestion. Updated the code as suggested.
| Preconditions.checkArgument(type instanceof Types.StructType, | ||
| "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s", type, union); | ||
|
|
||
| // we start index from 1 because 0 is the tag field which doesn't exist in the original Avro schema |
There was a problem hiding this comment.
Style: In Iceberg, avoid using personal pronouns ("I" and "we") in comments or documentation. Pronouns don't make docs more clear, they actually make them less direct. Here, you can use "start at index 1 because ...".
There was a problem hiding this comment.
Thanks for the suggestion. Updated the comments as suggested.
| } | ||
| } else { // complex union case | ||
| Preconditions.checkArgument(type instanceof Types.StructType, | ||
| "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s", type, union); |
There was a problem hiding this comment.
I think this should also check whether the schema with type visitor has the tag field. There's no guarantee that it does.
Along the same lines, what happens if the struct is projected or out of order? I'd prefer to look up the struct field for each option in the union by field ID, just like we do with struct fields. For a struct field, we get the field ID from the Avro schema and use that to find the corresponding field in the Iceberg struct.
If you end up using field IDs, I think the challenge is getting those field IDs in the Avro schema. I'm assuming that you're using NameMapping to work with the incoming Avro schemas, right? Can NameMapping be updated to map union fields?
If not, I think you'd want to align fields by using the field name from the Iceberg struct. For example, field1 would be the second branch (getTypes().get(1)) in the union.
There was a problem hiding this comment.
How about aligning by the type? field_i of type x aligns to the option of type x, regardless of the order? Else, we can mandate that the struct is in the same order as the options order (and the types match), and throw an exception here if not. I think both require recursively visiting the types to check for equality, but should be doable. The latter is kind of implemented here already, but I guess it will fail when trying to match the children as opposed to failing when trying to match the union itself.
There was a problem hiding this comment.
This PR is relevant too (still internal but will be brought upstream soon): linkedin/iceberg#108
There was a problem hiding this comment.
How about aligning by the type?
I think that's what we will need to do at some point, but this visitor assumes that both schemas have field IDs. I think for this, the right way to handle it is to get the field ID from the union type. It would mean rewriting the Avro schema ahead of time to look like this:
[
"null",
{"type": "int", "field-id": 34},
{"type": "string", "field-id": 35}
]That's why I'm wondering about how to attach the field IDs in the name mapping. In the name mapping, we could allow a nested level to represent the union. Names in that level could be types rather than names, so the mapping to produce the union above would be [ { "field-id": 34, "names": ["int"] }, { "field-id": 35, "names": ["string"] } ]. That works for simple types. For record, map, and array types we can use the simple type name as well, "record" or "map" or "array". That would support any union with just one option of each nested type. If you had more than one map in the union, it would fail. I think that's a reasonable starting place, though.
@wmoustafa, what do you think?
There was a problem hiding this comment.
What about the second option, where we expect them to be in the same order? (see this PR to support missing fields in the case of projection pruning)? This approach will also make sure that the deep types also match.
For the above suggestion, I am a bit worried about using only the top level types since it will fail in unexpected places and could lead to cryptic error messages if things go wrong. Also, can we list the whole type hierarchy instead?
That said, does it need to be retrofit into name mapping? I feel we could implement it without extending the name mapping as long as we have functions to deeply compare types.
I think as a starting point, we could assume they must align in terms of order (but do not necessarily be the same, as we could skip some in the struct), and later we can implement deep type comparisons, which will relax the ordering expectation (which is also forward compatible with the type-based check).
There was a problem hiding this comment.
I am a bit worried about using only the top level types since it will fail in unexpected places and could lead to cryptic error messages if things go wrong
We can fail gracefully. For example, if there are two records, we throw an exception in name mapping that multiple records aren't supported. If you think this is a common case, we can go further to find a solution for arbitrary nested types. That isn't too hard, actually. We could do it based on the child field set, which would have to match or at least have some overlap.
I don't think that doing this by order is a good idea. That could easily lead to worse cases where we're returning the wrong data.
There was a problem hiding this comment.
I don't think that doing this by order is a good idea. That could easily lead to worse cases where we're returning the wrong data.
Is the concern because of an out-of-order schema (e.g., the reader schema or the expected schema)? @yiqiangin has tried both cases and an out of order reader schema throws an exception and an out of order expected schema still returns correct results (both after applying this patch to address missing fields/projection pruning, so we may need to take that into account).
There was a problem hiding this comment.
The problem is that there can be identical branches in a union and order-based resolution could do this incorrectly. The name mapping approach allows this to be entirely by ID here, and the name mapping could do deeper validation because it has child field IDs for all nested types.
| "Invalid schema: non-option unions are not supported: %s", union); | ||
| Schema nonNullOriginal = AvroSchemaUtil.fromOption(union); | ||
| Schema nonNullResult = AvroSchemaUtil.fromOptions(Lists.newArrayList(options)); | ||
| if (AvroSchemaUtil.isOptionSchema(union)) { |
There was a problem hiding this comment.
Should this be using the AvroSchemaWithType visitor? I don't think that was written when this was added, but I like the idea of aligning the types with the right visitor, rather than having so much logic here.
There was a problem hiding this comment.
Are you suggesting refactoring BuildAvroProjection to use AvroSchemaWithTypeVisitor instead of AvroCustomOrderSchemaVisitor? If so, I agree. It will make projection implementation simpler. Do you think this can be done as a separate PR as it involves re-implementing logic out of scope of union support?
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time. |
Summary
Apache Iceberg does not support non-optional union types (e.g. [“int”, “string”]), nor does Apache Spark. This PR enables Iceberg to read non-optional union types by converting them into struct representations for Apache Avro format.
Representation
The struct representations converted from non-option union types are consistent with non-optional union support added in Trino in trinodb/trino#3483.
Deep nested non-optional union types are supported.
Examples
Basic
[“int”, “string”] -> struct<tag int, field0 int, field1 string>
Single type
[“int”] -> int
TODO
Handle single type union (e.g. [“int”]) as a primitive type
Support in non-Spark environments (e.g. iceberg-data, flink, hive, etc.)
Support for schema pruning within a complex union