From 4c3ff433b084603ab9eb74a4946df9af01b4a080 Mon Sep 17 00:00:00 2001 From: liwei Date: Tue, 6 Apr 2021 22:29:22 +0800 Subject: [PATCH 1/3] [HUDI-1768] spark datasource support schema validate add column --- .../hudi/client/TestTableSchemaEvolution.java | 14 +++ .../common/table/TableSchemaResolver.java | 9 +- .../hudi/functional/TestCOWDataSource.scala | 88 ++++++++++++++++++- 3 files changed, 108 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 7065247ded7d..cef70aefb896 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -67,6 +67,10 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { public static final String EXTRA_FIELD_SCHEMA = "{\"name\": \"new_field\", \"type\": \"boolean\", \"default\": false},"; + public static final String EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA = + "{\"name\": \"new_field_without_default\", \"type\": \"boolean\"},"; + public static final String EXTRA_FIELD_NULLABLE_SCHEMA = + "{\"name\": \"new_field_without_default\", \"type\": [\"boolean\", \"null\"]},"; // TRIP_EXAMPLE_SCHEMA with a new_field added public static final String TRIP_EXAMPLE_SCHEMA_EVOLVED = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA @@ -143,6 +147,16 @@ public void testSchemaCompatibilityBasic() throws Exception { + TRIP_SCHEMA_SUFFIX; assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema), "Multiple added fields with defauls are compatible"); + + assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, + TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA + TRIP_SCHEMA_SUFFIX), + "Added field without default and not nullable is not compatible (Evolved Schema)"); + + assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, + TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_NULLABLE_SCHEMA + TRIP_SCHEMA_SUFFIX), + "Added nullable field is compatible (Evolved Schema)"); } @Test diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 9467bfd96c34..2c5b62c8c236 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -56,6 +56,7 @@ public class TableSchemaResolver { private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class); private HoodieTableMetaClient metaClient; + private static Schema nullSchema = Schema.create(Schema.Type.NULL); public TableSchemaResolver(HoodieTableMetaClient metaClient) { this.metaClient = metaClient; @@ -331,8 +332,12 @@ public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) { final Field oldSchemaField = SchemaCompatibility.lookupWriterField(oldSchema, newSchemaField); if (oldSchemaField == null) { if (newSchemaField.defaultVal() == null) { - // C3: newly added field in newSchema does not have a default value - return false; + // For spark datasource added new filed will be nullable and convert to avro union type with null (SchemaConverters.toAvroType) + if (!(newSchemaField.schema().getType().equals(Schema.Type.UNION) + && newSchemaField.schema().getTypes().contains(nullSchema))) { + // C3: newly added field in newSchema does not have a default value + return false; + } } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 88ed65f89d49..7e46c658576e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -21,13 +21,13 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ - import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.exception.HoodieUpsertException import org.apache.hudi.keygen._ import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config import org.apache.hudi.testutils.HoodieClientTestBase @@ -669,4 +669,90 @@ class TestCOWDataSource extends HoodieClientTestBase { .load(basePath) assertEquals(N + 1, hoodieIncViewDF1.count()) } + + @Test def testSchemaEvolution(): Unit = { + // open the schema validate + val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true") + // 1. write records with schema1 + val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true):: + StructField("timestamp", IntegerType, true) :: StructField("partition", IntegerType, true)::Nil) + val records1 = Seq(Row("1", "Andy", 1, 1), + Row("2", "lisi", 1, 1), + Row("3", "zhangsan", 1, 1)) + val rdd = jsc.parallelize(records1) + val recordsDF = spark.createDataFrame(rdd, schema1) + recordsDF.write.format("org.apache.hudi") + .options(opts) + .mode(SaveMode.Append) + .save(basePath) + + + // 2. write records with schema2 add column age + val schema2 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true) :: + StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) :: + StructField("partition", IntegerType, true)::Nil) + + val records2 = Seq(Row("11", "Andy", "10", 1, 1), + Row("22", "lisi", "11",1, 1), + Row("33", "zhangsan", "12", 1, 1)) + val rdd2 = jsc.parallelize(records2) + val recordsDF2 = spark.createDataFrame(rdd2, schema2) + recordsDF2.write.format("org.apache.hudi") + .options(opts) + .mode(SaveMode.Append) + .save(basePath) + + val recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + recordsReadDF.show(false) + val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray) + assertEquals(resultSchema, schema2) + + // 3. write records with schema3 delete column name + try { + val schema3 = StructType(StructField("_row_key", StringType, true) :: + StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) :: + StructField("partition", IntegerType, true)::Nil) + + val records3 = Seq(Row("11", "10", 1, 1), + Row("22", "11",1, 1), + Row("33", "12", 1, 1)) + val rdd3 = jsc.parallelize(records3) + val recordsDF3 = spark.createDataFrame(rdd3, schema3) + recordsDF3.write.format("org.apache.hudi") + .options(opts) + .mode(SaveMode.Append) + .save(basePath) + fail("Delete column should fail") + } catch { + case ex: HoodieUpsertException => + assertTrue(ex.getMessage.equals("Failed upsert schema compatibility check.")) + } + } + + + @Test def testSchemaNotEqualData(): Unit = { + val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true") + val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true):: + StructField("timestamp", IntegerType, true):: StructField("age", StringType, true) :: StructField("partition", IntegerType, true)::Nil) + + val records = Array("{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}", + "{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}") + + val inputDF = spark.read.schema(schema1.toDDL).json(spark.sparkContext.parallelize(records, 2)) + inputDF.show(false) + + inputDF.write.format("org.apache.hudi") + .options(opts) + .mode(SaveMode.Append) + .save(basePath) + inputDF.printSchema() + + val recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + recordsReadDF.show(false) + + val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray) + assertEquals(resultSchema, schema1) + } } From 33049d3c0730824f062a586956e6857b512c3168 Mon Sep 17 00:00:00 2001 From: liwei Date: Sun, 2 May 2021 10:53:22 +0800 Subject: [PATCH 2/3] revert TableSchemaResolver.java modify as #2765 have fix it --- .../apache/hudi/common/table/TableSchemaResolver.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 2c5b62c8c236..9467bfd96c34 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -56,7 +56,6 @@ public class TableSchemaResolver { private static final Logger LOG = LogManager.getLogger(TableSchemaResolver.class); private HoodieTableMetaClient metaClient; - private static Schema nullSchema = Schema.create(Schema.Type.NULL); public TableSchemaResolver(HoodieTableMetaClient metaClient) { this.metaClient = metaClient; @@ -332,12 +331,8 @@ public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) { final Field oldSchemaField = SchemaCompatibility.lookupWriterField(oldSchema, newSchemaField); if (oldSchemaField == null) { if (newSchemaField.defaultVal() == null) { - // For spark datasource added new filed will be nullable and convert to avro union type with null (SchemaConverters.toAvroType) - if (!(newSchemaField.schema().getType().equals(Schema.Type.UNION) - && newSchemaField.schema().getTypes().contains(nullSchema))) { - // C3: newly added field in newSchema does not have a default value - return false; - } + // C3: newly added field in newSchema does not have a default value + return false; } } } From 9f863f754ba0537d230f6c2c0e605e8fbf858b91 Mon Sep 17 00:00:00 2001 From: liwei Date: Sun, 2 May 2021 11:30:40 +0800 Subject: [PATCH 3/3] [HUDI-1768] delete log show. --- .../org/apache/hudi/client/TestTableSchemaEvolution.java | 4 ++-- .../scala/org/apache/hudi/functional/TestCOWDataSource.scala | 5 ----- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index cef70aefb896..88a95064f850 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -70,7 +70,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { public static final String EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA = "{\"name\": \"new_field_without_default\", \"type\": \"boolean\"},"; public static final String EXTRA_FIELD_NULLABLE_SCHEMA = - "{\"name\": \"new_field_without_default\", \"type\": [\"boolean\", \"null\"]},"; + ",{\"name\": \"new_field_without_default\", \"type\": [\"boolean\", \"null\"]}"; // TRIP_EXAMPLE_SCHEMA with a new_field added public static final String TRIP_EXAMPLE_SCHEMA_EVOLVED = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA @@ -155,7 +155,7 @@ public void testSchemaCompatibilityBasic() throws Exception { assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA - + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_NULLABLE_SCHEMA + TRIP_SCHEMA_SUFFIX), + + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX + EXTRA_FIELD_NULLABLE_SCHEMA), "Added nullable field is compatible (Evolved Schema)"); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index b59188d8dced..940e4ff1ea46 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -720,7 +720,6 @@ class TestCOWDataSource extends HoodieClientTestBase { .mode(SaveMode.Append) .save(basePath) - // 2. write records with schema2 add column age val schema2 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true) :: StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) :: @@ -738,7 +737,6 @@ class TestCOWDataSource extends HoodieClientTestBase { val recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath + "/*/*") - recordsReadDF.show(false) val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray) assertEquals(resultSchema, schema2) @@ -774,17 +772,14 @@ class TestCOWDataSource extends HoodieClientTestBase { "{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}") val inputDF = spark.read.schema(schema1.toDDL).json(spark.sparkContext.parallelize(records, 2)) - inputDF.show(false) inputDF.write.format("org.apache.hudi") .options(opts) .mode(SaveMode.Append) .save(basePath) - inputDF.printSchema() val recordsReadDF = spark.read.format("org.apache.hudi") .load(basePath + "/*/*") - recordsReadDF.show(false) val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray) assertEquals(resultSchema, schema1)