diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index 7065247ded7dd..88a95064f850d 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -67,6 +67,10 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { public static final String EXTRA_FIELD_SCHEMA = "{\"name\": \"new_field\", \"type\": \"boolean\", \"default\": false},"; + public static final String EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA = + "{\"name\": \"new_field_without_default\", \"type\": \"boolean\"},"; + public static final String EXTRA_FIELD_NULLABLE_SCHEMA = + ",{\"name\": \"new_field_without_default\", \"type\": [\"boolean\", \"null\"]}"; // TRIP_EXAMPLE_SCHEMA with a new_field added public static final String TRIP_EXAMPLE_SCHEMA_EVOLVED = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA @@ -143,6 +147,16 @@ public void testSchemaCompatibilityBasic() throws Exception { + TRIP_SCHEMA_SUFFIX; assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema), "Multiple added fields with defauls are compatible"); + + assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, + TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA + TRIP_SCHEMA_SUFFIX), + "Added field without default and not nullable is not compatible (Evolved Schema)"); + + assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, + TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX + EXTRA_FIELD_NULLABLE_SCHEMA), + "Added nullable field is compatible (Evolved Schema)"); } @Test diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index bf730e2129715..940e4ff1ea466 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -21,7 +21,6 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ - import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.timeline.HoodieInstant @@ -29,6 +28,7 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.common.testutils.RawTripTestPayload.deleteRecordsToStrings import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.exception.HoodieUpsertException import org.apache.hudi.keygen._ import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config import org.apache.hudi.testutils.HoodieClientTestBase @@ -703,4 +703,85 @@ class TestCOWDataSource extends HoodieClientTestBase { .load(basePath) assertEquals(N + 1, hoodieIncViewDF1.count()) } + + @Test def testSchemaEvolution(): Unit = { + // open the schema validate + val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true") + // 1. write records with schema1 + val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true):: + StructField("timestamp", IntegerType, true) :: StructField("partition", IntegerType, true)::Nil) + val records1 = Seq(Row("1", "Andy", 1, 1), + Row("2", "lisi", 1, 1), + Row("3", "zhangsan", 1, 1)) + val rdd = jsc.parallelize(records1) + val recordsDF = spark.createDataFrame(rdd, schema1) + recordsDF.write.format("org.apache.hudi") + .options(opts) + .mode(SaveMode.Append) + .save(basePath) + + // 2. write records with schema2 add column age + val schema2 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true) :: + StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) :: + StructField("partition", IntegerType, true)::Nil) + + val records2 = Seq(Row("11", "Andy", "10", 1, 1), + Row("22", "lisi", "11",1, 1), + Row("33", "zhangsan", "12", 1, 1)) + val rdd2 = jsc.parallelize(records2) + val recordsDF2 = spark.createDataFrame(rdd2, schema2) + recordsDF2.write.format("org.apache.hudi") + .options(opts) + .mode(SaveMode.Append) + .save(basePath) + + val recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray) + assertEquals(resultSchema, schema2) + + // 3. write records with schema3 delete column name + try { + val schema3 = StructType(StructField("_row_key", StringType, true) :: + StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) :: + StructField("partition", IntegerType, true)::Nil) + + val records3 = Seq(Row("11", "10", 1, 1), + Row("22", "11",1, 1), + Row("33", "12", 1, 1)) + val rdd3 = jsc.parallelize(records3) + val recordsDF3 = spark.createDataFrame(rdd3, schema3) + recordsDF3.write.format("org.apache.hudi") + .options(opts) + .mode(SaveMode.Append) + .save(basePath) + fail("Delete column should fail") + } catch { + case ex: HoodieUpsertException => + assertTrue(ex.getMessage.equals("Failed upsert schema compatibility check.")) + } + } + + + @Test def testSchemaNotEqualData(): Unit = { + val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true") + val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true):: + StructField("timestamp", IntegerType, true):: StructField("age", StringType, true) :: StructField("partition", IntegerType, true)::Nil) + + val records = Array("{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}", + "{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}") + + val inputDF = spark.read.schema(schema1.toDDL).json(spark.sparkContext.parallelize(records, 2)) + + inputDF.write.format("org.apache.hudi") + .options(opts) + .mode(SaveMode.Append) + .save(basePath) + + val recordsReadDF = spark.read.format("org.apache.hudi") + .load(basePath + "/*/*") + + val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray) + assertEquals(resultSchema, schema1) + } }