Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,27 @@ public static boolean isOptionSchema(Schema schema) {
return false;
}

/**
* This method decides whether a schema is of type union and is complex union and is optional
*
* Complex union: the number of options in union larger than 2
* Optional: null is present in union
*
* @param schema input schema
* @return true if schema is complex union and it is optional
*/
public static boolean isOptionalComplexUnion(Schema schema) {
if (schema.getType() == UNION && schema.getTypes().size() > 2) {
for (Schema type : schema.getTypes()) {
if (type.getType() == Schema.Type.NULL) {
return true;
}
}
}

return false;
}

static Schema toOption(Schema schema) {
if (schema.getType() == UNION) {
Preconditions.checkArgument(isOptionSchema(schema),
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,21 @@ public static <T> T visit(Schema schema, AvroSchemaVisitor<T> visitor) {
case UNION:
List<Schema> types = schema.getTypes();
List<T> options = Lists.newArrayListWithExpectedSize(types.size());
for (Schema type : types) {
options.add(visit(type, visitor));
if (AvroSchemaUtil.isOptionSchema(schema)) {
for (Schema type : types) {
options.add(visit(type, visitor));
}
} else {
// complex union case
int nonNullIdx = 0;
for (Schema type : types) {
if (type.getType() != Schema.Type.NULL) {
options.add(visitWithName("field" + nonNullIdx, type, visitor));
nonNullIdx += 1;
} else {
options.add(visit(type, visitor));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not visit with the field name?

}
}
}
return visitor.union(schema, options);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,30 @@ private static <T> T visitRecord(Types.StructType struct, Schema record, AvroSch
private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
List<Schema> types = union.getTypes();
List<T> options = Lists.newArrayListWithExpectedSize(types.size());
for (Schema branch : types) {
if (branch.getType() == Schema.Type.NULL) {
options.add(visit((Type) null, branch, visitor));
} else {
options.add(visit(type, branch, visitor));

// simple union case
if (AvroSchemaUtil.isOptionSchema(union)) {
for (Schema branch : types) {
if (branch.getType() == Schema.Type.NULL) {
options.add(visit((Type) null, branch, visitor));
} else {
options.add(visit(type, branch, visitor));
}
}
} else { // complex union case
Preconditions.checkArgument(type instanceof Types.StructType,
"Cannot visit invalid Iceberg type: %s for Avro complex union type: %s", type, union);
Copy link
Copy Markdown
Contributor

@rdblue rdblue May 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should also check whether the schema with type visitor has the tag field. There's no guarantee that it does.

Along the same lines, what happens if the struct is projected or out of order? I'd prefer to look up the struct field for each option in the union by field ID, just like we do with struct fields. For a struct field, we get the field ID from the Avro schema and use that to find the corresponding field in the Iceberg struct.

If you end up using field IDs, I think the challenge is getting those field IDs in the Avro schema. I'm assuming that you're using NameMapping to work with the incoming Avro schemas, right? Can NameMapping be updated to map union fields?

If not, I think you'd want to align fields by using the field name from the Iceberg struct. For example, field1 would be the second branch (getTypes().get(1)) in the union.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about aligning by the type? field_i of type x aligns to the option of type x, regardless of the order? Else, we can mandate that the struct is in the same order as the options order (and the types match), and throw an exception here if not. I think both require recursively visiting the types to check for equality, but should be doable. The latter is kind of implemented here already, but I guess it will fail when trying to match the children as opposed to failing when trying to match the union itself.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is relevant too (still internal but will be brought upstream soon): linkedin/iceberg#108

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about aligning by the type?

I think that's what we will need to do at some point, but this visitor assumes that both schemas have field IDs. I think for this, the right way to handle it is to get the field ID from the union type. It would mean rewriting the Avro schema ahead of time to look like this:

[
  "null",
  {"type": "int", "field-id": 34},
  {"type": "string", "field-id": 35}
]

That's why I'm wondering about how to attach the field IDs in the name mapping. In the name mapping, we could allow a nested level to represent the union. Names in that level could be types rather than names, so the mapping to produce the union above would be [ { "field-id": 34, "names": ["int"] }, { "field-id": 35, "names": ["string"] } ]. That works for simple types. For record, map, and array types we can use the simple type name as well, "record" or "map" or "array". That would support any union with just one option of each nested type. If you had more than one map in the union, it would fail. I think that's a reasonable starting place, though.

@wmoustafa, what do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the second option, where we expect them to be in the same order? (see this PR to support missing fields in the case of projection pruning)? This approach will also make sure that the deep types also match.
For the above suggestion, I am a bit worried about using only the top level types since it will fail in unexpected places and could lead to cryptic error messages if things go wrong. Also, can we list the whole type hierarchy instead?

That said, does it need to be retrofit into name mapping? I feel we could implement it without extending the name mapping as long as we have functions to deeply compare types.

I think as a starting point, we could assume they must align in terms of order (but do not necessarily be the same, as we could skip some in the struct), and later we can implement deep type comparisons, which will relax the ordering expectation (which is also forward compatible with the type-based check).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit worried about using only the top level types since it will fail in unexpected places and could lead to cryptic error messages if things go wrong

We can fail gracefully. For example, if there are two records, we throw an exception in name mapping that multiple records aren't supported. If you think this is a common case, we can go further to find a solution for arbitrary nested types. That isn't too hard, actually. We could do it based on the child field set, which would have to match or at least have some overlap.

I don't think that doing this by order is a good idea. That could easily lead to worse cases where we're returning the wrong data.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that doing this by order is a good idea. That could easily lead to worse cases where we're returning the wrong data.

Is the concern because of an out-of-order schema (e.g., the reader schema or the expected schema)? @yiqiangin has tried both cases and an out of order reader schema throws an exception and an out of order expected schema still returns correct results (both after applying this patch to address missing fields/projection pruning, so we may need to take that into account).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that there can be identical branches in a union and order-based resolution could do this incorrectly. The name mapping approach allows this to be entirely by ID here, and the name mapping could do deeper validation because it has child field IDs for all nested types.


List<Types.NestedField> fields = type.asStructType().fields();
// start index from 1 because 0 is the tag field which doesn't exist in the original Avro schema
int index = 1;
for (Schema branch : types) {
if (branch.getType() == Schema.Type.NULL) {
options.add(visit((Type) null, branch, visitor));
} else {
options.add(visit(fields.get(index).type(), branch, visitor));
index += 1;
}
}
}
return visitor.union(type, union, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.avro;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -154,16 +155,48 @@ public Schema.Field field(Schema.Field field, Supplier<Schema> fieldResult) {

@Override
public Schema union(Schema union, Iterable<Schema> options) {
Preconditions.checkState(AvroSchemaUtil.isOptionSchema(union),
"Invalid schema: non-option unions are not supported: %s", union);
Schema nonNullOriginal = AvroSchemaUtil.fromOption(union);
Schema nonNullResult = AvroSchemaUtil.fromOptions(Lists.newArrayList(options));
if (AvroSchemaUtil.isOptionSchema(union)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be using the AvroSchemaWithType visitor? I don't think that was written when this was added, but I like the idea of aligning the types with the right visitor, rather than having so much logic here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting refactoring BuildAvroProjection to use AvroSchemaWithTypeVisitor instead of AvroCustomOrderSchemaVisitor? If so, I agree. It will make projection implementation simpler. Do you think this can be done as a separate PR as it involves re-implementing logic out of scope of union support?

Schema nonNullOriginal = AvroSchemaUtil.fromOption(union);
Schema nonNullResult = AvroSchemaUtil.fromOptions(Lists.newArrayList(options));

if (!Objects.equals(nonNullOriginal, nonNullResult)) {
return AvroSchemaUtil.toOption(nonNullResult);
}
if (!Objects.equals(nonNullOriginal, nonNullResult)) {
return AvroSchemaUtil.toOption(nonNullResult);
}

return union;
} else { // Complex union
Preconditions.checkArgument(current instanceof Types.StructType,
"Incompatible projected type: %s for Avro complex union type: %s", current, union);

Types.StructType asStructType = current.asStructType();

long nonNullBranchesCount = union.getTypes().stream()
.filter(branch -> branch.getType() != Schema.Type.NULL).count();
Preconditions.checkState(asStructType.fields().size() > nonNullBranchesCount,
"Column projection on struct converted from Avro complex union type: %s is not supported", union);

Iterator<Schema> resultBranchIterator = options.iterator();

// we start index from 1 because 0 is the tag field which doesn't exist in the original Avro
int index = 1;
List<Schema> resultBranches = Lists.newArrayListWithExpectedSize(union.getTypes().size());

return union;
try {
for (Schema originalBranch : union.getTypes()) {
if (originalBranch.getType() == Schema.Type.NULL) {
resultBranches.add(resultBranchIterator.next());
} else {
this.current = asStructType.fields().get(index).type();
resultBranches.add(resultBranchIterator.next());
index += 1;
}
}

return Schema.createUnion(resultBranches);
} finally {
this.current = asStructType;
}
}
}

@Override
Expand Down
49 changes: 33 additions & 16 deletions core/src/main/java/org/apache/iceberg/avro/PruneColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,25 +119,27 @@ public Schema record(Schema record, List<String> names, List<Schema> fields) {

@Override
public Schema union(Schema union, List<Schema> options) {
Preconditions.checkState(AvroSchemaUtil.isOptionSchema(union),
"Invalid schema: non-option unions are not supported: %s", union);

// only unions with null are allowed, and a null schema results in null
Schema pruned = null;
if (options.get(0) != null) {
pruned = options.get(0);
} else if (options.get(1) != null) {
pruned = options.get(1);
}
if (AvroSchemaUtil.isOptionSchema(union)) {
// case option union
Schema pruned = null;
if (options.get(0) != null) {
pruned = options.get(0);
} else if (options.get(1) != null) {
pruned = options.get(1);
}

if (pruned != null) {
if (!Objects.equals(pruned, AvroSchemaUtil.fromOption(union))) {
return AvroSchemaUtil.toOption(pruned);
if (pruned != null) {
if (!Objects.equals(pruned, AvroSchemaUtil.fromOption(union))) {
return AvroSchemaUtil.toOption(pruned);
}
return union;
}
return union;
}

return null;
return null;
} else {
// Complex union case
return copyUnion(union, options);
}
}

@Override
Expand Down Expand Up @@ -323,4 +325,19 @@ private static Schema.Field copyField(Schema.Field field, Schema newSchema, Inte
private static boolean isOptionSchemaWithNonNullFirstOption(Schema schema) {
return AvroSchemaUtil.isOptionSchema(schema) && schema.getTypes().get(0).getType() != Schema.Type.NULL;
}

// for primitive types, the visitResult will be null, we want to reuse the primitive types from the original
// schema, while for nested types, we want to use the visitResult because they have content from the previous
// recursive calls.
private static Schema copyUnion(Schema record, List<Schema> visitResults) {
List<Schema> branches = Lists.newArrayListWithExpectedSize(visitResults.size());
for (int i = 0; i < visitResults.size(); i++) {
if (visitResults.get(i) == null) {
branches.add(record.getTypes().get(i));
} else {
branches.add(visitResults.get(i));
}
}
return Schema.createUnion(branches);
}
}
28 changes: 21 additions & 7 deletions core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public Type record(Schema record, List<String> names, List<Type> fieldTypes) {
Type fieldType = fieldTypes.get(i);
int fieldId = getId(field);

if (AvroSchemaUtil.isOptionSchema(field.schema())) {
if (AvroSchemaUtil.isOptionSchema(field.schema()) || AvroSchemaUtil.isOptionalComplexUnion(field.schema())) {
newFields.add(Types.NestedField.optional(fieldId, field.name(), fieldType, field.doc()));
} else {
newFields.add(Types.NestedField.required(fieldId, field.name(), fieldType, field.doc()));
Expand All @@ -105,13 +105,27 @@ public Type record(Schema record, List<String> names, List<Type> fieldTypes) {

@Override
public Type union(Schema union, List<Type> options) {
Preconditions.checkArgument(AvroSchemaUtil.isOptionSchema(union),
"Unsupported type: non-option union: %s", union);
// records, arrays, and maps will check nullability later
if (options.get(0) == null) {
return options.get(1);
if (AvroSchemaUtil.isOptionSchema(union)) {
// Optional simple union
// records, arrays, and maps will check nullability later
if (options.get(0) == null) {
return options.get(1);
} else {
return options.get(0);
}
} else {
return options.get(0);
// Complex union
List<Types.NestedField> newFields = Lists.newArrayList();
newFields.add(Types.NestedField.required(allocateId(), "tag", Types.IntegerType.get()));

int tagIndex = 0;
for (Type type : options) {
if (type != null) {
newFields.add(Types.NestedField.optional(allocateId(), "field" + tagIndex++, type));
}
}

return Types.StructType.of(newFields);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,4 +256,121 @@ public void projectMapWithLessFieldInValueSchema() {
assertEquals("Unexpected value ID discovered on the projected map schema",
1, Integer.valueOf(actual.getProp(AvroSchemaUtil.VALUE_ID_PROP)).intValue());
}

@Test
public void projectUnionWithBranchSchemaUnchanged() {

final Type icebergType = Types.StructType.of(
Types.NestedField.required(0, "tag", Types.IntegerType.get()),
Types.NestedField.optional(1, "field0", Types.IntegerType.get()),
Types.NestedField.optional(2, "field1", Types.StringType.get())
);

final org.apache.avro.Schema expected = SchemaBuilder.unionOf()
.intType()
.and()
.stringType()
.endUnion();

final BuildAvroProjection testSubject = new BuildAvroProjection(icebergType, Collections.emptyMap());

final Iterable<org.apache.avro.Schema> branches = expected.getTypes();

final org.apache.avro.Schema actual = testSubject.union(expected, branches);

assertEquals("Union projection produced undesired union schema",
expected, actual);
}

@Test
public void projectUnionWithTypePromotion() {

final Type icebergType = Types.StructType.of(
Types.NestedField.required(0, "tag", Types.IntegerType.get()),
Types.NestedField.optional(1, "field0", Types.LongType.get()),
Types.NestedField.optional(2, "field1", Types.StringType.get())
);

final org.apache.avro.Schema originalSchema = SchemaBuilder.unionOf()
.intType()
.and()
.stringType()
.endUnion();

// once projected onto iceberg schema, first branch of Avro union schema will be promoted from int to long
final org.apache.avro.Schema expected = SchemaBuilder.unionOf()
.longType()
.and()
.stringType()
.endUnion();

final BuildAvroProjection testSubject = new BuildAvroProjection(icebergType, Collections.emptyMap());

final Iterable<org.apache.avro.Schema> branches = expected.getTypes();

final org.apache.avro.Schema actual = testSubject.union(originalSchema, branches);

assertEquals("Union projection produced undesired union schema",
expected, actual);
}

@Test
public void projectUnionWithExtraFieldInNestedType() {

final Type icebergType = Types.StructType.of(
Types.NestedField.required(0, "tag", Types.IntegerType.get()),
Types.NestedField.optional(1, "field0", Types.StringType.get()),
Types.NestedField.optional(2, "field1", Types.StructType.of(
Types.NestedField.optional(3, "c1", Types.IntegerType.get()),
Types.NestedField.optional(4, "c2", Types.StringType.get()),
Types.NestedField.optional(5, "c3", Types.StringType.get())
))
);

final org.apache.avro.Schema originalSchema = SchemaBuilder.unionOf()
.stringType()
.and()
.record("r")
.fields()
.name("c1")
.type()
.intType()
.noDefault()
.name("c2")
.type()
.stringType()
.noDefault()
.endRecord()
.endUnion();

// once projected onto iceberg schema, the avro schema will have an extra string column in struct within union
final org.apache.avro.Schema expected = SchemaBuilder.unionOf()
.stringType()
.and()
.record("r")
.fields()
.name("c1")
.type()
.intType()
.noDefault()
.name("c2")
.type()
.stringType()
.noDefault()
.name("c3")
.type()
.stringType()
.noDefault()
.endRecord()
.endUnion();

final BuildAvroProjection testSubject = new BuildAvroProjection(icebergType, Collections.emptyMap());

final Iterable<org.apache.avro.Schema> branches = expected.getTypes();

final org.apache.avro.Schema actual = testSubject.union(originalSchema, branches);

assertEquals("Union projection produced undesired union schema",
expected, actual);
}
}
Loading