Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ public static boolean isOptionSchema(Schema schema) {
return false;
}

/**
* This method decides whether a schema represents a single type union, i.e., a union that contains only one option
*
* @param schema input schema
* @return true if schema is single type union
*/
public static boolean isSingleTypeUnion(Schema schema) {
return schema.getType() == UNION && schema.getTypes().size() == 1;
}

/**
* This method decides whether a schema is of type union and is complex union and is optional
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisit
options.add(visit(type, branch, visitor));
}
}
} else if (AvroSchemaUtil.isSingleTypeUnion(union)) { // single type union case
Schema branch = types.get(0);
if (branch.getType() == Schema.Type.NULL) {
options.add(visit((Type) null, branch, visitor));
} else {
options.add(visit(type, branch, visitor));
}
} else { // complex union case
int index = 1;
for (Schema branch : types) {
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ public Type union(Schema union, List<Type> options) {
} else {
return options.get(0);
}
} else if (AvroSchemaUtil.isSingleTypeUnion(union)) {
// Single type union
return options.get(0);
} else {
// Complex union
List<Types.NestedField> newFields = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testOptionalComplexUnion() {
}

@Test
public void testSimpleUnionSchema() {
public void testOptionalSingleUnionSchema() {
Schema avroSchema = SchemaBuilder.record("root")
.fields()
.name("optionCol")
Expand All @@ -92,4 +92,62 @@ public void testSimpleUnionSchema() {

Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString());
}

@Test
public void testSingleTypeUnionSchema() {
Schema avroSchema = SchemaBuilder.record("root")
.fields()
.name("unionCol")
.type()
.unionOf()
.intType()
.endUnion()
.noDefault()
.endRecord();

org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
String expectedIcebergSchema = "table {\n" + " 0: unionCol: required int\n" + "}";

Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString());
}

@Test
public void testNestedSingleTypeUnionSchema() {
Schema avroSchema = SchemaBuilder.record("root")
.fields()
.name("col1")
.type()
.array()
.items()
.unionOf()
.stringType()
.endUnion()
.noDefault()
.endRecord();

org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
String expectedIcebergSchema = "table {\n" + " 0: col1: required list<string>\n" + "}";

Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString());
}

@Test
public void testSingleTypeUnionOfComplexTypeSchema() {
Schema avroSchema = SchemaBuilder.record("root")
.fields()
.name("unionCol")
.type()
.unionOf()
.array()
.items()
.intType()
.endUnion()
.noDefault()
.endRecord();

org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
String expectedIcebergSchema = "table {\n" + " 0: unionCol: required list<int>\n" + "}";

Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public ValueReader<?> record(Types.StructType expected, Schema record, List<Stri

@Override
public ValueReader<?> union(Type expected, Schema union, List<ValueReader<?>> options) {
if (AvroSchemaUtil.isOptionSchema(union)) {
if (AvroSchemaUtil.isOptionSchema(union) || AvroSchemaUtil.isSingleTypeUnion(union)) {
return ValueReaders.union(options);
} else {
return SparkValueReaders.union(union, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,124 @@ public void writeAndValidateOptionalComplexUnion() throws IOException {

@Test
public void writeAndValidateSingleTypeUnion() throws IOException {
org.apache.avro.Schema avroSchema = SchemaBuilder.record("root")
.fields()
.name("unionCol")
.type()
.unionOf()
.intType()
.endUnion()
.noDefault()
.endRecord();

GenericData.Record unionRecord1 = new GenericData.Record(avroSchema);
unionRecord1.put("unionCol", 0);
GenericData.Record unionRecord2 = new GenericData.Record(avroSchema);
unionRecord2.put("unionCol", 1);

File testFile = temp.newFile();
try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
writer.create(avroSchema, testFile);
writer.append(unionRecord1);
writer.append(unionRecord2);
}

Schema expectedSchema = AvroSchemaUtil.toIceberg(avroSchema);

List<InternalRow> rows;
try (AvroIterable<InternalRow> reader = Avro.read(Files.localInput(testFile))
.createReaderFunc(SparkAvroReader::new)
.project(expectedSchema)
.build()) {
rows = Lists.newArrayList(reader);

Assert.assertEquals(0, rows.get(0).getInt(0));
Assert.assertEquals(1, rows.get(1).getInt(0));
}
}

@Test
public void writeAndValidateNestedSingleTypeUnion() throws IOException {
org.apache.avro.Schema avroSchema = SchemaBuilder.record("root")
.fields()
.name("col1")
.type()
.array()
.items()
.unionOf()
.stringType()
.endUnion()
.noDefault()
.endRecord();

GenericData.Record unionRecord1 = new GenericData.Record(avroSchema);
unionRecord1.put("col1", Arrays.asList("foo"));
GenericData.Record unionRecord2 = new GenericData.Record(avroSchema);
unionRecord2.put("col1", Arrays.asList("bar"));

File testFile = temp.newFile();
try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
writer.create(avroSchema, testFile);
writer.append(unionRecord1);
writer.append(unionRecord2);
}

Schema expectedSchema = AvroSchemaUtil.toIceberg(avroSchema);

List<InternalRow> rows;
try (AvroIterable<InternalRow> reader = Avro.read(Files.localInput(testFile))
.createReaderFunc(SparkAvroReader::new)
.project(expectedSchema)
.build()) {
rows = Lists.newArrayList(reader);

Assert.assertEquals("foo", rows.get(0).getArray(0).getUTF8String(0).toString());
Assert.assertEquals("bar", rows.get(1).getArray(0).getUTF8String(0).toString());
}
}

@Test
public void writeAndValidateSingleTypeUnionOfComplexType() throws IOException {
org.apache.avro.Schema avroSchema = SchemaBuilder.record("root")
.fields()
.name("unionCol")
.type()
.unionOf()
.array()
.items()
.intType()
.endUnion()
.noDefault()
.endRecord();

GenericData.Record unionRecord1 = new GenericData.Record(avroSchema);
unionRecord1.put("unionCol", Arrays.asList(1));
GenericData.Record unionRecord2 = new GenericData.Record(avroSchema);
unionRecord2.put("unionCol", Arrays.asList(2));

File testFile = temp.newFile();
try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new GenericDatumWriter<>())) {
writer.create(avroSchema, testFile);
writer.append(unionRecord1);
writer.append(unionRecord2);
}

Schema expectedSchema = AvroSchemaUtil.toIceberg(avroSchema);

List<InternalRow> rows;
try (AvroIterable<InternalRow> reader = Avro.read(Files.localInput(testFile))
.createReaderFunc(SparkAvroReader::new)
.project(expectedSchema)
.build()) {
rows = Lists.newArrayList(reader);

Assert.assertEquals(1, rows.get(0).getArray(0).getInt(0));
Assert.assertEquals(2, rows.get(1).getArray(0).getInt(0));
}
}

@Test
public void writeAndValidateOptionalSingleUnion() throws IOException {
org.apache.avro.Schema avroSchema = SchemaBuilder.record("root")
.fields()
.name("unionCol")
Expand Down