diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 0851ffa1c3..f6866088da 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -138,6 +138,16 @@ public static boolean isOptionSchema(Schema schema) { return false; } + /** + * This method decides whether a schema represents a single type union, i.e., a union that contains only one option + * + * @param schema input schema + * @return true if schema is single type union + */ + public static boolean isSingleTypeUnion(Schema schema) { + return schema.getType() == UNION && schema.getTypes().size() == 1; + } + /** * This method decides whether a schema is of type union and is complex union and is optional * diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java index 606529685f..6157482748 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaWithTypeVisitor.java @@ -89,6 +89,13 @@ private static T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisit options.add(visit(type, branch, visitor)); } } + } else if (AvroSchemaUtil.isSingleTypeUnion(union)) { // single type union case + Schema branch = types.get(0); + if (branch.getType() == Schema.Type.NULL) { + options.add(visit((Type) null, branch, visitor)); + } else { + options.add(visit(type, branch, visitor)); + } } else { // complex union case int index = 1; for (Schema branch : types) { diff --git a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java index 61e5cc9f92..3bb559b9a6 100644 --- a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java +++ b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java @@ -116,6 +116,9 @@ public Type union(Schema union, List options) { } else { return options.get(0); } + } else if (AvroSchemaUtil.isSingleTypeUnion(union)) { + // Single type union + return options.get(0); } else { // Complex union List newFields = new ArrayList<>(); diff --git a/core/src/test/java/org/apache/iceberg/avro/TestUnionSchemaConversions.java b/core/src/test/java/org/apache/iceberg/avro/TestUnionSchemaConversions.java index 1ba6471735..bdbca9c5c1 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestUnionSchemaConversions.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestUnionSchemaConversions.java @@ -74,7 +74,7 @@ public void testOptionalComplexUnion() { } @Test - public void testSimpleUnionSchema() { + public void testOptionalSingleUnionSchema() { Schema avroSchema = SchemaBuilder.record("root") .fields() .name("optionCol") @@ -92,4 +92,62 @@ public void testSimpleUnionSchema() { Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString()); } + + @Test + public void testSingleTypeUnionSchema() { + Schema avroSchema = SchemaBuilder.record("root") + .fields() + .name("unionCol") + .type() + .unionOf() + .intType() + .endUnion() + .noDefault() + .endRecord(); + + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + String expectedIcebergSchema = "table {\n" + " 0: unionCol: required int\n" + "}"; + + Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString()); + } + + @Test + public void testNestedSingleTypeUnionSchema() { + Schema avroSchema = SchemaBuilder.record("root") + .fields() + .name("col1") + .type() + .array() + .items() + .unionOf() + .stringType() + .endUnion() + .noDefault() + .endRecord(); + + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + String expectedIcebergSchema = "table {\n" + " 0: col1: required list\n" + "}"; + + Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString()); + } + + @Test + public void testSingleTypeUnionOfComplexTypeSchema() { + Schema avroSchema = SchemaBuilder.record("root") + .fields() + .name("unionCol") + .type() + .unionOf() + .array() + .items() + .intType() + .endUnion() + .noDefault() + .endRecord(); + + org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema); + String expectedIcebergSchema = "table {\n" + " 0: unionCol: required list\n" + "}"; + + Assert.assertEquals(expectedIcebergSchema, icebergSchema.toString()); + } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java index f467650261..75108053ee 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroReader.java @@ -80,7 +80,7 @@ public ValueReader record(Types.StructType expected, Schema record, List union(Type expected, Schema union, List> options) { - if (AvroSchemaUtil.isOptionSchema(union)) { + if (AvroSchemaUtil.isOptionSchema(union) || AvroSchemaUtil.isSingleTypeUnion(union)) { return ValueReaders.union(options); } else { return SparkValueReaders.union(union, options); diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java index fa3ff37e42..eb17e45713 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroUnions.java @@ -141,6 +141,124 @@ public void writeAndValidateOptionalComplexUnion() throws IOException { @Test public void writeAndValidateSingleTypeUnion() throws IOException { + org.apache.avro.Schema avroSchema = SchemaBuilder.record("root") + .fields() + .name("unionCol") + .type() + .unionOf() + .intType() + .endUnion() + .noDefault() + .endRecord(); + + GenericData.Record unionRecord1 = new GenericData.Record(avroSchema); + unionRecord1.put("unionCol", 0); + GenericData.Record unionRecord2 = new GenericData.Record(avroSchema); + unionRecord2.put("unionCol", 1); + + File testFile = temp.newFile(); + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.create(avroSchema, testFile); + writer.append(unionRecord1); + writer.append(unionRecord2); + } + + Schema expectedSchema = AvroSchemaUtil.toIceberg(avroSchema); + + List rows; + try (AvroIterable reader = Avro.read(Files.localInput(testFile)) + .createReaderFunc(SparkAvroReader::new) + .project(expectedSchema) + .build()) { + rows = Lists.newArrayList(reader); + + Assert.assertEquals(0, rows.get(0).getInt(0)); + Assert.assertEquals(1, rows.get(1).getInt(0)); + } + } + + @Test + public void writeAndValidateNestedSingleTypeUnion() throws IOException { + org.apache.avro.Schema avroSchema = SchemaBuilder.record("root") + .fields() + .name("col1") + .type() + .array() + .items() + .unionOf() + .stringType() + .endUnion() + .noDefault() + .endRecord(); + + GenericData.Record unionRecord1 = new GenericData.Record(avroSchema); + unionRecord1.put("col1", Arrays.asList("foo")); + GenericData.Record unionRecord2 = new GenericData.Record(avroSchema); + unionRecord2.put("col1", Arrays.asList("bar")); + + File testFile = temp.newFile(); + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.create(avroSchema, testFile); + writer.append(unionRecord1); + writer.append(unionRecord2); + } + + Schema expectedSchema = AvroSchemaUtil.toIceberg(avroSchema); + + List rows; + try (AvroIterable reader = Avro.read(Files.localInput(testFile)) + .createReaderFunc(SparkAvroReader::new) + .project(expectedSchema) + .build()) { + rows = Lists.newArrayList(reader); + + Assert.assertEquals("foo", rows.get(0).getArray(0).getUTF8String(0).toString()); + Assert.assertEquals("bar", rows.get(1).getArray(0).getUTF8String(0).toString()); + } + } + + @Test + public void writeAndValidateSingleTypeUnionOfComplexType() throws IOException { + org.apache.avro.Schema avroSchema = SchemaBuilder.record("root") + .fields() + .name("unionCol") + .type() + .unionOf() + .array() + .items() + .intType() + .endUnion() + .noDefault() + .endRecord(); + + GenericData.Record unionRecord1 = new GenericData.Record(avroSchema); + unionRecord1.put("unionCol", Arrays.asList(1)); + GenericData.Record unionRecord2 = new GenericData.Record(avroSchema); + unionRecord2.put("unionCol", Arrays.asList(2)); + + File testFile = temp.newFile(); + try (DataFileWriter writer = new DataFileWriter<>(new GenericDatumWriter<>())) { + writer.create(avroSchema, testFile); + writer.append(unionRecord1); + writer.append(unionRecord2); + } + + Schema expectedSchema = AvroSchemaUtil.toIceberg(avroSchema); + + List rows; + try (AvroIterable reader = Avro.read(Files.localInput(testFile)) + .createReaderFunc(SparkAvroReader::new) + .project(expectedSchema) + .build()) { + rows = Lists.newArrayList(reader); + + Assert.assertEquals(1, rows.get(0).getArray(0).getInt(0)); + Assert.assertEquals(2, rows.get(1).getArray(0).getInt(0)); + } + } + + @Test + public void writeAndValidateOptionalSingleUnion() throws IOException { org.apache.avro.Schema avroSchema = SchemaBuilder.record("root") .fields() .name("unionCol")