Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase {

public static final String EXTRA_FIELD_SCHEMA =
"{\"name\": \"new_field\", \"type\": \"boolean\", \"default\": false},";
public static final String EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA =
"{\"name\": \"new_field_without_default\", \"type\": \"boolean\"},";
public static final String EXTRA_FIELD_NULLABLE_SCHEMA =
",{\"name\": \"new_field_without_default\", \"type\": [\"boolean\", \"null\"]}";

// TRIP_EXAMPLE_SCHEMA with a new_field added
public static final String TRIP_EXAMPLE_SCHEMA_EVOLVED = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
Expand Down Expand Up @@ -143,6 +147,16 @@ public void testSchemaCompatibilityBasic() throws Exception {
+ TRIP_SCHEMA_SUFFIX;
assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema),
"Multiple added fields with defauls are compatible");

assertFalse(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
+ FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA + TRIP_SCHEMA_SUFFIX),
"Added field without default and not nullable is not compatible (Evolved Schema)");

assertTrue(TableSchemaResolver.isSchemaCompatible(TRIP_EXAMPLE_SCHEMA,
TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA
+ FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX + EXTRA_FIELD_NULLABLE_SCHEMA),
"Added nullable field is compatible (Evolved Schema)");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ import java.sql.{Date, Timestamp}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._

import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.testutils.RawTripTestPayload.deleteRecordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieUpsertException
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.Config
import org.apache.hudi.testutils.HoodieClientTestBase
Expand Down Expand Up @@ -703,4 +703,85 @@ class TestCOWDataSource extends HoodieClientTestBase {
.load(basePath)
assertEquals(N + 1, hoodieIncViewDF1.count())
}

@Test def testSchemaEvolution(): Unit = {
// open the schema validate
val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true")
// 1. write records with schema1
val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true)::
StructField("timestamp", IntegerType, true) :: StructField("partition", IntegerType, true)::Nil)
val records1 = Seq(Row("1", "Andy", 1, 1),
Row("2", "lisi", 1, 1),
Row("3", "zhangsan", 1, 1))
val rdd = jsc.parallelize(records1)
val recordsDF = spark.createDataFrame(rdd, schema1)
recordsDF.write.format("org.apache.hudi")
.options(opts)
.mode(SaveMode.Append)
.save(basePath)

// 2. write records with schema2 add column age
val schema2 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true) ::
StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) ::
StructField("partition", IntegerType, true)::Nil)

val records2 = Seq(Row("11", "Andy", "10", 1, 1),
Row("22", "lisi", "11",1, 1),
Row("33", "zhangsan", "12", 1, 1))
val rdd2 = jsc.parallelize(records2)
val recordsDF2 = spark.createDataFrame(rdd2, schema2)
recordsDF2.write.format("org.apache.hudi")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, with out the fix in TableSchemaResolver, this line was failing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will throw schema incompatible

.options(opts)
.mode(SaveMode.Append)
.save(basePath)

val recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*")
val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray)
assertEquals(resultSchema, schema2)

// 3. write records with schema3 delete column name
try {
val schema3 = StructType(StructField("_row_key", StringType, true) ::
StructField("age", StringType, true) :: StructField("timestamp", IntegerType, true) ::
StructField("partition", IntegerType, true)::Nil)

val records3 = Seq(Row("11", "10", 1, 1),
Row("22", "11",1, 1),
Row("33", "12", 1, 1))
val rdd3 = jsc.parallelize(records3)
val recordsDF3 = spark.createDataFrame(rdd3, schema3)
recordsDF3.write.format("org.apache.hudi")
.options(opts)
.mode(SaveMode.Append)
.save(basePath)
fail("Delete column should fail")
} catch {
case ex: HoodieUpsertException =>
assertTrue(ex.getMessage.equals("Failed upsert schema compatibility check."))
}
}


@Test def testSchemaNotEqualData(): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you confirm if my understanding of this test case is right.
when ingested batch has lesser columns compared to the schema passed in, writes should still succeed and the table schema should be the schema passed and not the ingested df's schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this unit test is to check spark datasource can add the column to null , if the data do not have the column value

val opts = commonOpts ++ Map("hoodie.avro.schema.validate" -> "true")
val schema1 = StructType(StructField("_row_key", StringType, true) :: StructField("name", StringType, true)::
StructField("timestamp", IntegerType, true):: StructField("age", StringType, true) :: StructField("partition", IntegerType, true)::Nil)

val records = Array("{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}",
"{\"_row_key\":\"1\",\"name\":\"lisi\",\"timestamp\":1,\"partition\":1}")

val inputDF = spark.read.schema(schema1.toDDL).json(spark.sparkContext.parallelize(records, 2))

inputDF.write.format("org.apache.hudi")
.options(opts)
.mode(SaveMode.Append)
.save(basePath)

val recordsReadDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*")

val resultSchema = new StructType(recordsReadDF.schema.filter(p=> !p.name.startsWith("_hoodie")).toArray)
assertEquals(resultSchema, schema1)
}
}