diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala index db1ca6f94c3f6..b1596f700ab2e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala @@ -21,7 +21,6 @@ package org.apache.hudi import java.nio.ByteBuffer import java.sql.{Date, Timestamp} import java.util - import org.apache.avro.Conversions.DecimalConversion import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis} import org.apache.avro.Schema.Type._ @@ -340,7 +339,7 @@ object AvroConversionHelper { } } case structType: StructType => - val schema: Schema = SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace) + val schema: Schema = AvroConversionUtils.convertStructTypeToAvroSchema(structType, structName, recordNamespace) val childNameSpace = if (recordNamespace != "") s"$recordNamespace.$structName" else structName val fieldConverters = structType.fields.map(field => createConverterToAvro( diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 88101265de297..94d9cb5641f56 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -18,7 +18,7 @@ package org.apache.hudi -import org.apache.avro.Schema +import org.apache.avro.{JsonProperties, Schema} import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord} import org.apache.hudi.avro.HoodieAvroUtils import org.apache.spark.rdd.RDD @@ -27,6 +27,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Dataset, Row, SparkSession} import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ object AvroConversionUtils { @@ -49,13 +50,45 @@ object AvroConversionUtils { def convertStructTypeToAvroSchema(structType: StructType, structName: String, recordNamespace: String): Schema = { - SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace) + getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)) } def convertAvroSchemaToStructType(avroSchema: Schema): StructType = { SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] } + /** + * Regenerate Avro schema with proper nullable default values. Avro expects null to be first entry in case of UNION so that + * default value can be set to null. + * @param writeSchema original writer schema. + * @return the regenerated schema with proper defaults set. + */ + def getAvroSchemaWithDefaults(writeSchema: Schema): Schema = { + val modifiedFields = writeSchema.getFields.map(field => { + field.schema().getType match { + case Schema.Type.RECORD => { + val newSchema = getAvroSchemaWithDefaults(field.schema()) + new Schema.Field(field.name(), newSchema, field.doc(), JsonProperties.NULL_VALUE) + } + case Schema.Type.UNION => { + val innerFields = field.schema().getTypes + val containsNullSchema = innerFields.foldLeft(false)((nullFieldEncountered, schema) => nullFieldEncountered | schema.getType == Schema.Type.NULL) + if(containsNullSchema) { + val newSchema = Schema.createUnion(List(Schema.create(Schema.Type.NULL)) ++ innerFields.filter(innerSchema => !(innerSchema.getType == Schema.Type.NULL))) + val newSchemaField = new Schema.Field(field.name(), newSchema, field.doc(), JsonProperties.NULL_VALUE) + newSchemaField + } else { + new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()) + } + } + case _ => new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()) + } + }).toList + val newSchema = Schema.createRecord(writeSchema.getName, writeSchema.getDoc, writeSchema.getNamespace, writeSchema.isError) + newSchema.setFields(modifiedFields) + newSchema + } + def buildAvroRecordBySchema(record: IndexedRecord, requiredSchema: Schema, requiredPos: List[Int], diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java index b0bb509962333..ebf23242a5279 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java @@ -45,6 +45,10 @@ public static Schema getStructTypeExampleSchema() throws IOException { return new Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleSchema.txt"))); } + public static Schema getStructTypeExampleEvolvedSchema() throws IOException { + return new Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleEvolvedSchema.txt"))); + } + public static List generateRandomRows(int count) { Random random = new Random(); List toReturn = new ArrayList<>(); @@ -58,4 +62,31 @@ public static List generateRandomRows(int count) { } return toReturn; } + + public static List generateUpdates(List records, int count) { + List toReturn = new ArrayList<>(); + for (int i = 0; i < count; i++) { + Object[] values = new Object[3]; + values[0] = records.get(i).getString(0); + values[1] = records.get(i).getAs(1); + values[2] = new Date().getTime(); + toReturn.add(RowFactory.create(values)); + } + return toReturn; + } + + public static List generateRandomRowsEvolvedSchema(int count) { + Random random = new Random(); + List toReturn = new ArrayList<>(); + List partitions = Arrays.asList(new String[] {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH}); + for (int i = 0; i < count; i++) { + Object[] values = new Object[4]; + values[0] = UUID.randomUUID().toString(); + values[1] = partitions.get(random.nextInt(3)); + values[2] = new Date().getTime(); + values[3] = UUID.randomUUID().toString(); + toReturn.add(RowFactory.create(values)); + } + return toReturn; + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt new file mode 100644 index 0000000000000..7498cd611b0c6 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{ + "namespace": "example.schema", + "type": "record", + "name": "trip", + "fields": [ + { + "name": "_row_key", + "type": "string" + }, + { + "name": "partition", + "type": "string" + }, + { + "name": "ts", + "type": ["long", "null"] + }, + { + "name": "new_field", + "type": ["string","null"] + } + ] +} + diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index bbaeea1086e25..0f23e7815442a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -17,13 +17,10 @@ package org.apache.hudi.functional -import java.time.Instant -import java.util -import java.util.{Collections, Date, UUID} - import org.apache.commons.io.FileUtils import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.{SparkRDDWriteClient, TestBootstrap} +import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} @@ -34,10 +31,14 @@ import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, DataSourceWriteOpt import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession} +import org.junit.jupiter.api.Assertions.assertEquals import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{spy, times, verify} import org.scalatest.{FunSuite, Matchers} +import java.time.Instant +import java.util +import java.util.{Collections, Date, UUID} import scala.collection.JavaConversions._ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { @@ -113,6 +114,91 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } } + List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .foreach(tableType => { + test("test schema evolution for " + tableType) { + initSparkContext("test_schema_evolution") + val path = java.nio.file.Files.createTempDirectory("hoodie_test_path") + try { + val hoodieFooTableName = "hoodie_foo_tbl" + //create a new table + val fooTableModifier = Map("path" -> path.toAbsolutePath.toString, + HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName, + "hoodie.insert.shuffle.parallelism" -> "1", + "hoodie.upsert.shuffle.parallelism" -> "1", + DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType, + DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", + DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> "org.apache.hudi.keygen.SimpleKeyGenerator") + val fooTableParams = HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier) + + // generate the inserts + var schema = DataSourceTestUtils.getStructTypeExampleSchema + var structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + var records = DataSourceTestUtils.generateRandomRows(10) + var recordsSeq = convertRowListToSeq(records) + var df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) + // write to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableParams, df1) + + val snapshotDF1 = spark.read.format("org.apache.hudi") + .load(path.toAbsolutePath.toString + "/*/*/*/*") + assertEquals(10, snapshotDF1.count()) + + // remove metadata columns so that expected and actual DFs can be compared as is + val trimmedDf1 = snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) + + assert(df1.except(trimmedDf1).count() == 0) + + // issue updates so that log files are created for MOR table + var updates = DataSourceTestUtils.generateUpdates(records, 5); + var updatesSeq = convertRowListToSeq(updates) + var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType) + // write to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, updatesDf) + + val snapshotDF2 = spark.read.format("org.apache.hudi") + .load(path.toAbsolutePath.toString + "/*/*/*/*") + assertEquals(10, snapshotDF2.count()) + + // remove metadata columns so that expected and actual DFs can be compared as is + val trimmedDf2 = snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) + + // ensure 2nd batch of updates matches. + assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 0) + + schema = DataSourceTestUtils.getStructTypeExampleEvolvedSchema + structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + records = DataSourceTestUtils.generateRandomRowsEvolvedSchema(5) + recordsSeq = convertRowListToSeq(records) + val df3 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) + // write to Hudi + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams, df3) + + val snapshotDF3 = spark.read.format("org.apache.hudi") + .load(path.toAbsolutePath.toString + "/*/*/*/*") + assertEquals(15, snapshotDF3.count()) + + // remove metadata columns so that expected and actual DFs can be compared as is + val trimmedDf3 = snapshotDF3.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3)) + .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4)) + + // ensure 2nd batch of updates matches. + assert(df3.intersect(trimmedDf3).except(df3).count() == 0) + + } finally { + spark.stop() + FileUtils.deleteDirectory(path.toFile) + } + } + }) + + test("test bulk insert dataset with datasource impl") { initSparkContext("test_bulk_insert_datasource") val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")