Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static <T> T visit(Schema schema, AvroSchemaVisitor<T> visitor) {
int idx = 0;
for (Schema type : types) {
if (type.getType() != Schema.Type.NULL) {
options.add(visitWithName("tag_" + idx, type, visitor));
options.add(visitWithName("field" + idx, type, visitor));
idx += 1;
} else {
options.add(visit(type, visitor));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,24 @@ private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisit
List<Schema> types = union.getTypes();
List<T> options = Lists.newArrayListWithExpectedSize(types.size());

int index = 0;
for (Schema branch : types) {
if (branch.getType() == Schema.Type.NULL) {
options.add(visit((Type) null, branch, visitor));
} else {
if (AvroSchemaUtil.isOptionSchema(union)) {
// simple union case
if (AvroSchemaUtil.isOptionSchema(union)) {
for (Schema branch : types) {
if (branch.getType() == Schema.Type.NULL) {
options.add(visit((Type) null, branch, visitor));
} else {
options.add(visit(type, branch, visitor));
}
}
} else { // complex union case
int index = 1;
for (Schema branch : types) {
if (branch.getType() == Schema.Type.NULL) {
options.add(visit((Type) null, branch, visitor));
} else {
options.add(visit(type.asStructType().fields().get(index).type(), branch, visitor));
index += 1;
}
index++;
}
}
return visitor.union(type, union, options);
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iceberg.avro;

import java.util.ArrayList;
import java.util.List;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
Expand Down Expand Up @@ -116,12 +117,13 @@ public Type union(Schema union, List<Type> options) {
}
} else {
// Complex union

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there an official definition of what is complex and what is simple ? If we target to contribute it back, maybe better to clarify that thru javadoc -- If this is merely internally usage I am fine with what we have now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simple union is [sometype, null] while complex union is [sometype1, sometype2, ...], where there are at least 2 non-null types in the union, I think we can probably add a java doc in AvroSchemaUtil.isOptionSchema, which is an existing method in upstream iceberg.

List<Types.NestedField> newFields = Lists.newArrayListWithExpectedSize(options.size());
List<Types.NestedField> newFields = new ArrayList<>();
newFields.add(Types.NestedField.required(allocateId(), "tag", Types.IntegerType.get()));

int tagIndex = 0;
for (Type type : options) {
if (type != null) {
newFields.add(Types.NestedField.optional(allocateId(), "tag_" + tagIndex++, type));
newFields.add(Types.NestedField.optional(allocateId(), "field" + tagIndex++, type));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.junit.Test;


public class TestAvroComplexUnion {
public class TestUnionSchemaConversions {

@Test
public void testRequiredComplexUnion() {
Expand All @@ -43,7 +43,8 @@ public void testRequiredComplexUnion() {

org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
String expectedIcebergSchema = "table {\n" +
" 0: unionCol: required struct<1: tag_0: optional int, 2: tag_1: optional string>\n" + "}";
" 0: unionCol: required struct<1: tag: required int, 2: field0: optional int, 3: field1: optional string>\n" +
"}";

Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString());
}
Expand All @@ -65,32 +66,15 @@ public void testOptionalComplexUnion() {
.endRecord();

org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
String expectedIcebergSchema =
"table {\n" + " 0: unionCol: optional struct<1: tag_0: optional int, 2: tag_1: optional string>\n" + "}";

Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString());
}

@Test
public void testSingleComponentUnion() {
Schema avroSchema = SchemaBuilder.record("root")
.fields()
.name("unionCol")
.type()
.unionOf()
.intType()
.endUnion()
.noDefault()
.endRecord();

org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
String expectedIcebergSchema = "table {\n" + " 0: unionCol: required struct<1: tag_0: optional int>\n" + "}";
String expectedIcebergSchema = "table {\n" +
" 0: unionCol: optional struct<1: tag: required int, 2: field0: optional int, 3: field1: optional string>\n" +
"}";

Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString());
}

@Test
public void testOptionSchema() {
public void testSimpleUnionSchema() {
Schema avroSchema = SchemaBuilder.record("root")
.fields()
.name("optionCol")
Expand All @@ -108,22 +92,4 @@ public void testOptionSchema() {

Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString());
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this considered invalid at all? or what's the reason to get rid of this case ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this schema itself shouldn't appear in the first place, user shouldn't define this kind of schema.

@Test
public void testNullUnionSchema() {
Schema avroSchema = SchemaBuilder.record("root")
.fields()
.name("nullUnionCol")
.type()
.unionOf()
.nullType()
.endUnion()
.noDefault()
.endRecord();

org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
String expectedIcebergSchema = "table {\n" + " 0: nullUnionCol: optional struct<>\n" + "}";

Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ protected void set(InternalRow struct, int pos, Object value) {
}
}

static class UnionReader implements ValueReader<InternalRow> {
private static class UnionReader implements ValueReader<InternalRow> {
private final Schema schema;
private final ValueReader[] readers;

Expand All @@ -316,20 +316,30 @@ public InternalRow read(Decoder decoder, Object reuse) throws IOException {
break;
}
}
InternalRow struct = new GenericInternalRow(nullIndex >= 0 ? alts.size() - 1 : alts.size());

int index = decoder.readIndex();
if (index == nullIndex) {
// if it is a null data, directly return null as the whole union result
return null;
}

// otherwise, we need to return an InternalRow as a struct data
InternalRow struct = new GenericInternalRow(nullIndex >= 0 ? alts.size() : alts.size() + 1);
for (int i = 0; i < struct.numFields(); i += 1) {
struct.setNullAt(i);
}

int index = decoder.readIndex();
Object value = this.readers[index].read(decoder, reuse);
Object value = readers[index].read(decoder, reuse);

if (nullIndex < 0) {
struct.update(index, value);
struct.update(index + 1, value);
struct.setInt(0, index);
} else if (index < nullIndex) {
struct.update(index + 1, value);
struct.setInt(0, index);
} else {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I follow this, what does the relative position between value index and null index have anything to do with how we assign the value in the struct ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because in avro the nullability of a union type is presented by the existence of a NULL type inside the union alternatives, and the NULL type can occur in any order inside the union, but in our converted struct, we don't have a field that corresponds to that NULL type, thus we have -1 field count.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess what I didn't know earlier was, if NULL type appears in the union it seems it has to be the first position in
the alternative list ? But the avro spec said "Thus, for unions containing "null", the "null" is usually listed first,"

Will this lead to a silent failure if you don't check this first?

Copy link
Member Author

@rzhang10 rzhang10 Dec 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's just a recommendation from the Avro spec, not mandated, in fact, in our ecosystem there are many avro schemas which failed to put null as the first type element in the union.

And that's why I'm specifically computing the null index here and branching on it.

struct.update(index, value);
} else if (index > nullIndex) {
struct.update(index - 1, value);
struct.setInt(0, index - 1);
}

return struct;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ public void writeAndValidateRequiredComplexUnion() throws IOException {
unionRecord2.put("unionCol", 1);

File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());

try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
writer.create(avroSchema, testFile);
writer.append(unionRecord1);
Expand All @@ -82,13 +80,15 @@ public void writeAndValidateRequiredComplexUnion() throws IOException {
.build()) {
rows = Lists.newArrayList(reader);

Assert.assertEquals(2, rows.get(0).getStruct(0, 2).numFields());
Assert.assertTrue(rows.get(0).getStruct(0, 2).isNullAt(0));
Assert.assertEquals("foo", rows.get(0).getStruct(0, 2).getString(1));
Assert.assertEquals(3, rows.get(0).getStruct(0, 3).numFields());
Assert.assertEquals(1, rows.get(0).getStruct(0, 3).getInt(0));
Assert.assertTrue(rows.get(0).getStruct(0, 3).isNullAt(1));
Assert.assertEquals("foo", rows.get(0).getStruct(0, 3).getString(2));

Assert.assertEquals(2, rows.get(1).getStruct(0, 2).numFields());
Assert.assertEquals(1, rows.get(1).getStruct(0, 2).getInt(0));
Assert.assertTrue(rows.get(1).getStruct(0, 2).isNullAt(1));
Assert.assertEquals(3, rows.get(1).getStruct(0, 3).numFields());
Assert.assertEquals(0, rows.get(1).getStruct(0, 3).getInt(0));
Assert.assertEquals(1, rows.get(1).getStruct(0, 3).getInt(1));
Assert.assertTrue(rows.get(1).getStruct(0, 3).isNullAt(2));
}
}

Expand Down Expand Up @@ -116,8 +116,6 @@ public void writeAndValidateOptionalComplexUnion() throws IOException {
unionRecord3.put("unionCol", null);

File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());

try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
writer.create(avroSchema, testFile);
writer.append(unionRecord1);
Expand All @@ -134,10 +132,9 @@ public void writeAndValidateOptionalComplexUnion() throws IOException {
.build()) {
rows = Lists.newArrayList(reader);

Assert.assertEquals("foo", rows.get(0).getStruct(0, 2).getString(1));
Assert.assertEquals(1, rows.get(1).getStruct(0, 2).getInt(0));
Assert.assertTrue(rows.get(2).getStruct(0, 2).isNullAt(0));
Assert.assertTrue(rows.get(2).getStruct(0, 2).isNullAt(1));
Assert.assertEquals("foo", rows.get(0).getStruct(0, 3).getString(2));
Assert.assertEquals(1, rows.get(1).getStruct(0, 3).getInt(1));
Assert.assertTrue(rows.get(2).isNullAt(0));
}
}

Expand All @@ -161,8 +158,6 @@ public void writeAndValidateSingleTypeUnion() throws IOException {
unionRecord2.put("unionCol", 1);

File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());

try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
writer.create(avroSchema, testFile);
writer.append(unionRecord1);
Expand Down Expand Up @@ -207,8 +202,6 @@ public void testDeeplyNestedUnionSchema1() throws IOException {
unionRecord2.put("col1", Arrays.asList(2, "bar"));

File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());

try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
writer.create(avroSchema, testFile);
writer.append(unionRecord1);
Expand All @@ -225,7 +218,7 @@ public void testDeeplyNestedUnionSchema1() throws IOException {
rows = Lists.newArrayList(reader);

// making sure it reads the correctly nested structured data, based on the transformation from union to struct
Assert.assertEquals("foo", rows.get(0).getArray(0).getStruct(0, 2).getString(1));
Assert.assertEquals("foo", rows.get(0).getArray(0).getStruct(0, 3).getString(2));
}
}

Expand Down Expand Up @@ -265,8 +258,6 @@ public void testDeeplyNestedUnionSchema2() throws IOException {
outer.put("col1", Arrays.asList(inner));

File testFile = temp.newFile();
Assert.assertTrue("Delete should succeed", testFile.delete());

try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
writer.create(avroSchema, testFile);
writer.append(outer);
Expand All @@ -281,7 +272,7 @@ public void testDeeplyNestedUnionSchema2() throws IOException {
rows = Lists.newArrayList(reader);

// making sure it reads the correctly nested structured data, based on the transformation from union to struct
Assert.assertEquals(1, rows.get(0).getArray(0).getStruct(0, 2).getStruct(0, 1).getInt(0));
Assert.assertEquals(1, rows.get(0).getArray(0).getStruct(0, 3).getStruct(1, 1).getInt(0));
}
}
}