From 48330dfcc178c4da1d28aa3797bc73166b3f8a6d Mon Sep 17 00:00:00 2001 From: Malini Mahalakshmi Venkatachari Date: Tue, 7 Jun 2022 16:56:25 -0700 Subject: [PATCH 1/3] Enhance the UT for testing required fields with default values --- ...rkAvroReaderForFieldsWithDefaultValue.java | 167 +++++++++++++----- 1 file changed, 121 insertions(+), 46 deletions(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java index 388397f0e7..7e7fecaac5 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java @@ -21,6 +21,8 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.avro.generic.GenericData; import org.apache.iceberg.Files; @@ -31,17 +33,17 @@ import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Type; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; +import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.unsafe.types.UTF8String; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import static org.apache.avro.Schema.Type.INT; -import static org.apache.avro.Schema.Type.NULL; -import static org.apache.iceberg.spark.SparkSchemaUtil.convert; +import static org.apache.iceberg.spark.data.TestHelpers.assertEquals; public class TestSparkAvroReaderForFieldsWithDefaultValue { @@ -50,72 +52,145 @@ public class TestSparkAvroReaderForFieldsWithDefaultValue { @Test public void testAvroDefaultValues() throws IOException { - String indexFiledName = "index"; - String nullableFiledName = "optionalFieldWithDefault"; - String requiredFiledName = "requiredFieldWithDefault"; - int defaultValue = -1; - // write records with initial writeSchema - org.apache.avro.Schema writeSchema = org.apache.avro.Schema.createRecord("root", null, null, false, - ImmutableList.of(new org.apache.avro.Schema.Field(indexFiledName, org.apache.avro.Schema.create(INT), - null, null), new org.apache.avro.Schema.Field(nullableFiledName, - org.apache.avro.Schema.createUnion(org.apache.avro.Schema.create(INT), - org.apache.avro.Schema.create(NULL)), null, defaultValue))); + String writeSchemaString = "{\n" + + " \"namespace\": \"com.n1\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"n1\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"f1\",\n" + + " \"type\": \"string\",\n" + + " \"default\": \"foo\"\n" + + " }\n" + + " ]\n" + + "}"; + + org.apache.avro.Schema writeSchema = new org.apache.avro.Schema.Parser().parse(writeSchemaString); + org.apache.iceberg.Schema icebergWriteSchema = AvroSchemaUtil.toIceberg(writeSchema); - Schema icebergWriteSchema = AvroSchemaUtil.toIceberg(writeSchema); List expected = RandomData.generateList(icebergWriteSchema, 2, 0L); File testFile = temp.newFile(); Assert.assertTrue("Delete should succeed", testFile.delete()); + // write records with initial writeSchema try (FileAppender writer = Avro.write(Files.localOutput(testFile)) - .schema(icebergWriteSchema) - .named("test") - .build()) { + .schema(icebergWriteSchema) + .named("test") + .build()) { for (GenericData.Record rec : expected) { writer.add(rec); } } // evolve schema by adding a required field with default value - org.apache.avro.Schema evolvedSchema = org.apache.avro.Schema.createRecord("root", null, null, false, - ImmutableList.of(new org.apache.avro.Schema.Field(indexFiledName, org.apache.avro.Schema.create(INT), - null, null), - new org.apache.avro.Schema.Field(nullableFiledName, - org.apache.avro.Schema.createUnion(org.apache.avro.Schema.create(INT), - org.apache.avro.Schema.create(NULL)), null, defaultValue), - new org.apache.avro.Schema.Field(requiredFiledName, org.apache.avro.Schema.create(INT), null, defaultValue) - )); + String evolvedSchemaString = "{\n" + + " \"namespace\": \"com.n1\",\n" + + " \"type\": \"record\",\n" + + " \"name\": \"n1\",\n" + + " \"fields\": [\n" + + " {\n" + + " \"name\": \"f1\",\n" + + " \"type\": \"string\",\n" + + " \"default\": \"foo\"\n" + + " },\n" + + " {\n" + + " \"name\": \"f2\",\n" + + " \"type\": \"int\",\n" + + " \"default\": 1\n" + + " },\n" + + " {\n" + + " \"name\": \"f3\",\n" + + " \"type\": {\n" + + " \"type\": \"map\",\n" + + " \"values\" : \"int\"\n" + + " },\n" + + " \"default\": {\"a\": 1}\n" + + " },\n" + + " {\n" + + " \"name\": \"f4\",\n" + + " \"type\": {\n" + + " \"type\": \"array\",\n" + + " \"items\" : \"int\"\n" + + " },\n" + + " \"default\": [1, 2, 3]\n" + + " },\n" + + " {\n" + + " \"name\": \"f5\",\n" + + " \"type\": {\n" + + " \"type\": \"record\",\n" + + " \"name\": \"F5\",\n" + + " \"fields\" : [\n" + + " {\"name\": \"ff1\", \"type\": \"long\"},\n" + + " {\"name\": \"ff2\", \"type\": \"string\"}\n" + + " ]\n" + + " },\n" + + " \"default\": {\n" + + " \"ff1\": 999,\n" + + " \"ff2\": \"foo\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"name\": \"f6\",\n" + + " \"type\": {\n" + + " \"type\": \"map\",\n" + + " \"values\": {\n" + + " \"type\": \"array\",\n" + + " \"items\" : \"int\"\n" + + " }\n" + + " },\n" + + " \"default\": {\"key\": [1, 2, 3]}\n" + + " },\n" + + " {\n" + + " \"name\": \"f7\",\n" + + " \"type\": {\n" + + " \"type\": \"fixed\",\n" + + " \"name\": \"md5\",\n" + + " \"size\": 2\n" + + " },\n" + + " \"default\": \"FF\"\n" + + " }\n" + + " ]\n" + + "}"; + org.apache.avro.Schema evolvedSchema = new org.apache.avro.Schema.Parser().parse(evolvedSchemaString); - // read written rows with evolved schema List rows; Schema icebergReadSchema = AvroSchemaUtil.toIceberg(evolvedSchema); + // read written rows with evolved schema try (AvroIterable reader = Avro.read(Files.localInput(testFile)) - .createReaderFunc(SparkAvroReader::new) - .project(icebergReadSchema) - .build()) { + .createReaderFunc(SparkAvroReader::new) + .project(icebergReadSchema) + .build()) { rows = Lists.newArrayList(reader); } - // validate all rows, and all fields are read properly Assert.assertNotNull(rows); Assert.assertEquals(expected.size(), rows.size()); for (int row = 0; row < expected.size(); row++) { - GenericData.Record expectedRow = expected.get(row); InternalRow actualRow = rows.get(row); - List fields = icebergReadSchema.asStruct().fields(); - - for (int i = 0; i < fields.size(); i += 1) { - Object expectedValue = null; - if (i >= writeSchema.getFields().size() && fields.get(i).hasDefaultValue()) { - expectedValue = fields.get(i).getDefaultValue(); - } else if (i < writeSchema.getFields().size()) { - expectedValue = expectedRow.get(i); - } - Type fieldType = fields.get(i).type(); - Object actualValue = actualRow.isNullAt(i) ? null : actualRow.get(i, convert(fieldType)); - Assert.assertEquals(expectedValue, actualValue); - } + final InternalRow expectedFirstRow = new GenericInternalRow(7); + expectedFirstRow.update(0, UTF8String.fromString((String) expected.get(row).get(0))); + expectedFirstRow.update(1, 1); + expectedFirstRow.update(2, new ArrayBasedMapData( + new GenericArrayData(Arrays.asList(UTF8String.fromString("a"))), + new GenericArrayData(Arrays.asList(1)))); + expectedFirstRow.update(3, new GenericArrayData(ImmutableList.of(1, 2, 3).toArray())); + + final InternalRow nestedStructData = new GenericInternalRow(2); + nestedStructData.update(0, 999L); + nestedStructData.update(1, UTF8String.fromString("foo")); + expectedFirstRow.update(4, nestedStructData); + + List listOfLists = new ArrayList(1); + listOfLists.add(new GenericArrayData(ImmutableList.of(1, 2, 3).toArray())); + expectedFirstRow.update(5, new ArrayBasedMapData( + new GenericArrayData(Arrays.asList(UTF8String.fromString("key"))), + new GenericArrayData(listOfLists.toArray()))); + + byte[] objGUIDByteArr = "FF".getBytes("UTF-8"); + expectedFirstRow.update(6, objGUIDByteArr); + assertEquals(icebergReadSchema, actualRow, expectedFirstRow); } } } From 4054820ab58171c2b965cfd405d174e2b2ab0577 Mon Sep 17 00:00:00 2001 From: Malini Mahalakshmi Venkatachari Date: Sun, 19 Jun 2022 09:31:42 -0700 Subject: [PATCH 2/3] Addressed review comments --- ...rkAvroReaderForFieldsWithDefaultValue.java | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java index 7e7fecaac5..1bb5cba0b3 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java @@ -59,9 +59,8 @@ public void testAvroDefaultValues() throws IOException { " \"name\": \"n1\",\n" + " \"fields\": [\n" + " {\n" + - " \"name\": \"f1\",\n" + - " \"type\": \"string\",\n" + - " \"default\": \"foo\"\n" + + " \"name\": \"f0\",\n" + + " \"type\": \"string\"\n" + " }\n" + " ]\n" + "}"; @@ -91,6 +90,10 @@ public void testAvroDefaultValues() throws IOException { " \"name\": \"n1\",\n" + " \"fields\": [\n" + " {\n" + + " \"name\": \"f0\",\n" + + " \"type\": \"string\"\n" + + " },\n" + + " {\n" + " \"name\": \"f1\",\n" + " \"type\": \"string\",\n" + " \"default\": \"foo\"\n" + @@ -169,28 +172,30 @@ public void testAvroDefaultValues() throws IOException { Assert.assertEquals(expected.size(), rows.size()); for (int row = 0; row < expected.size(); row++) { InternalRow actualRow = rows.get(row); - final InternalRow expectedFirstRow = new GenericInternalRow(7); + final InternalRow expectedFirstRow = new GenericInternalRow(8); expectedFirstRow.update(0, UTF8String.fromString((String) expected.get(row).get(0))); - expectedFirstRow.update(1, 1); - expectedFirstRow.update(2, new ArrayBasedMapData( + expectedFirstRow.update(1, UTF8String.fromString("foo")); + expectedFirstRow.update(2, 1); + expectedFirstRow.update(3, new ArrayBasedMapData( new GenericArrayData(Arrays.asList(UTF8String.fromString("a"))), new GenericArrayData(Arrays.asList(1)))); - expectedFirstRow.update(3, new GenericArrayData(ImmutableList.of(1, 2, 3).toArray())); + expectedFirstRow.update(4, new GenericArrayData(ImmutableList.of(1, 2, 3).toArray())); final InternalRow nestedStructData = new GenericInternalRow(2); nestedStructData.update(0, 999L); nestedStructData.update(1, UTF8String.fromString("foo")); - expectedFirstRow.update(4, nestedStructData); + expectedFirstRow.update(5, nestedStructData); List listOfLists = new ArrayList(1); listOfLists.add(new GenericArrayData(ImmutableList.of(1, 2, 3).toArray())); - expectedFirstRow.update(5, new ArrayBasedMapData( + expectedFirstRow.update(6, new ArrayBasedMapData( new GenericArrayData(Arrays.asList(UTF8String.fromString("key"))), new GenericArrayData(listOfLists.toArray()))); byte[] objGUIDByteArr = "FF".getBytes("UTF-8"); - expectedFirstRow.update(6, objGUIDByteArr); + expectedFirstRow.update(7, objGUIDByteArr); assertEquals(icebergReadSchema, actualRow, expectedFirstRow); + } } } From a7c4f499a6e3d109c1842f4ade1f527d1781879f Mon Sep 17 00:00:00 2001 From: Malini Mahalakshmi Venkatachari Date: Fri, 24 Jun 2022 07:09:34 -0700 Subject: [PATCH 3/3] Addressed review comment --- ...rkAvroReaderForFieldsWithDefaultValue.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java index 1bb5cba0b3..a26181fdb8 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkAvroReaderForFieldsWithDefaultValue.java @@ -172,29 +172,29 @@ public void testAvroDefaultValues() throws IOException { Assert.assertEquals(expected.size(), rows.size()); for (int row = 0; row < expected.size(); row++) { InternalRow actualRow = rows.get(row); - final InternalRow expectedFirstRow = new GenericInternalRow(8); - expectedFirstRow.update(0, UTF8String.fromString((String) expected.get(row).get(0))); - expectedFirstRow.update(1, UTF8String.fromString("foo")); - expectedFirstRow.update(2, 1); - expectedFirstRow.update(3, new ArrayBasedMapData( + final InternalRow expectedRow = new GenericInternalRow(8); + expectedRow.update(0, UTF8String.fromString((String) expected.get(row).get(0))); + expectedRow.update(1, UTF8String.fromString("foo")); + expectedRow.update(2, 1); + expectedRow.update(3, new ArrayBasedMapData( new GenericArrayData(Arrays.asList(UTF8String.fromString("a"))), new GenericArrayData(Arrays.asList(1)))); - expectedFirstRow.update(4, new GenericArrayData(ImmutableList.of(1, 2, 3).toArray())); + expectedRow.update(4, new GenericArrayData(ImmutableList.of(1, 2, 3).toArray())); final InternalRow nestedStructData = new GenericInternalRow(2); nestedStructData.update(0, 999L); nestedStructData.update(1, UTF8String.fromString("foo")); - expectedFirstRow.update(5, nestedStructData); + expectedRow.update(5, nestedStructData); List listOfLists = new ArrayList(1); listOfLists.add(new GenericArrayData(ImmutableList.of(1, 2, 3).toArray())); - expectedFirstRow.update(6, new ArrayBasedMapData( + expectedRow.update(6, new ArrayBasedMapData( new GenericArrayData(Arrays.asList(UTF8String.fromString("key"))), new GenericArrayData(listOfLists.toArray()))); byte[] objGUIDByteArr = "FF".getBytes("UTF-8"); - expectedFirstRow.update(7, objGUIDByteArr); - assertEquals(icebergReadSchema, actualRow, expectedFirstRow); + expectedRow.update(7, objGUIDByteArr); + assertEquals(icebergReadSchema, actualRow, expectedRow); } }