From f6df8030d8daacb65904ea8c48442d1f5fcab0eb Mon Sep 17 00:00:00 2001 From: sivabalan Date: Sun, 5 Feb 2023 17:13:15 -0800 Subject: [PATCH] Adding support for dynamic schemas --- .../common/config/HoodieCommonConfig.java | 8 + .../apache/hudi/HoodieSparkSqlWriter.scala | 34 +--- .../apache/hudi/SchemaReconcileUtils.scala | 72 +++++++ .../functional/TestBasicSchemaEvolution.scala | 192 +++++++++++++++++- 4 files changed, 273 insertions(+), 33 deletions(-) create mode 100644 hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SchemaReconcileUtils.scala diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java index 25ed017fee86c..7bd37767e8308 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java @@ -67,6 +67,14 @@ public class HoodieCommonConfig extends HoodieConfig { .defaultValue(true) .withDocumentation("Turn on compression for BITCASK disk map used by the External Spillable Map"); + public static final String LEGACY_RECONCILE_STRATEGY = "legacy_reconcile_strategy"; + public static final String DYNAMIC_SCHEMA_RECONCILE_STRATEGY = "dynamic_schema_reconcile_strategy"; + + public static final ConfigProperty RECONCILE_SCHEMA_STRATEGY = ConfigProperty + .key("hoodie.datasource.write.reconcile.schema.strategy") + .defaultValue(LEGACY_RECONCILE_STRATEGY) + .withDocumentation("To be fixed"); + public ExternalSpillableMap.DiskMapType getSpillableDiskMapType() { return ExternalSpillableMap.DiskMapType.valueOf(getString(SPILLABLE_DISK_MAP_TYPE).toUpperCase(Locale.ROOT)); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 764f9474ee0e2..f1428bcc76016 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -415,20 +415,19 @@ object HoodieSparkSqlWriter { val allowAutoEvolutionColumnDrop = opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key, HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean + val reconcileSchemaStrategy = opts.getOrDefault(HoodieCommonConfig.RECONCILE_SCHEMA_STRATEGY.key(), + HoodieCommonConfig.RECONCILE_SCHEMA_STRATEGY.defaultValue()) if (shouldReconcileSchema) { internalSchemaOpt match { case Some(internalSchema) => // Apply schema evolution, by auto-merging write schema and read schema - val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, internalSchema) - val evolvedSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getFullName) - val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields().filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty - if (shouldRemoveMetaDataFromInternalSchema) HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema + SchemaReconcileUtils.reconcileSchema(sourceSchema, canonicalizedSourceSchema, internalSchema, latestTableSchema.getFullName) case None => // In case schema reconciliation is enabled we will employ (legacy) reconciliation // strategy to produce target writer's schema (see definition below) val (reconciledSchema, isCompatible) = - reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema) + SchemaReconcileUtils.reconcileSchemas(latestTableSchema, canonicalizedSourceSchema, reconcileSchemaStrategy) // NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible // w/ the table's one and allow schemas to diverge. This is required in cases where @@ -540,31 +539,6 @@ object HoodieSparkSqlWriter { HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key() -> schemaValidateEnable) } - private def reconcileSchemasLegacy(tableSchema: Schema, newSchema: Schema): (Schema, Boolean) = { - // Legacy reconciliation implements following semantic - // - In case new-schema is a "compatible" projection of the existing table's one (projection allowing - // permitted type promotions), table's schema would be picked as (reconciled) writer's schema; - // - Otherwise, we'd fall back to picking new (batch's) schema as a writer's schema; - // - // Philosophically, such semantic aims at always choosing a "wider" schema, ie the one containing - // the other one (schema A contains schema B, if schema B is a projection of A). This enables us, - // to always "extend" the schema during schema evolution and hence never lose the data (when, for ex - // existing column is being dropped in a new batch) - // - // NOTE: By default Hudi doesn't allow automatic schema evolution to drop the columns from the target - // table. However, when schema reconciliation is turned on, we would allow columns to be dropped - // in the incoming batch (as these would be reconciled in anyway) - if (isCompatibleProjectionOf(tableSchema, newSchema)) { - // Picking table schema as a writer schema we need to validate that we'd be able to - // rewrite incoming batch's data (written in new schema) into it - (tableSchema, isSchemaCompatible(newSchema, tableSchema, true)) - } else { - // Picking new schema as a writer schema we need to validate that we'd be able to - // rewrite table's data into it - (newSchema, isSchemaCompatible(tableSchema, newSchema, true)) - } - } - /** * get latest internalSchema from table * diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SchemaReconcileUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SchemaReconcileUtils.scala new file mode 100644 index 0000000000000..b8fd78c1015c5 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SchemaReconcileUtils.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.avro.Schema +import org.apache.hudi.avro.AvroSchemaUtils.{isCompatibleProjectionOf, isSchemaCompatible} +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.common.config.HoodieCommonConfig +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter +import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils + +import scala.collection.JavaConversions.asScalaBuffer + +object SchemaReconcileUtils { + + def reconcileSchema(sourceSchema: Schema, canonicalizedSourceSchema: Schema, internalSchema: InternalSchema, tableSchemaName: String) : Schema = { + val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, internalSchema) + val evolvedSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, tableSchemaName) + val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields().filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty + if (shouldRemoveMetaDataFromInternalSchema) HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema + } + + def reconcileSchemas(tableSchema: Schema, newSchema: Schema, reconcileStrategy: String): (Schema, Boolean) = { + if (reconcileStrategy.equalsIgnoreCase(HoodieCommonConfig.LEGACY_RECONCILE_STRATEGY)) { + // Legacy reconciliation implements following semantic + // - In case new-schema is a "compatible" projection of the existing table's one (projection allowing + // permitted type promotions), table's schema would be picked as (reconciled) writer's schema; + // - Otherwise, we'd fall back to picking new (batch's) schema as a writer's schema; + // + // Philosophically, such semantic aims at always choosing a "wider" schema, ie the one containing + // the other one (schema A contains schema B, if schema B is a projection of A). This enables us, + // to always "extend" the schema during schema evolution and hence never lose the data (when, for ex + // existing column is being dropped in a new batch) + // + // NOTE: By default Hudi doesn't allow automatic schema evolution to drop the columns from the target + // table. However, when schema reconciliation is turned on, we would allow columns to be dropped + // in the incoming batch (as these would be reconciled in anyway) + if (isCompatibleProjectionOf(tableSchema, newSchema)) { + // Picking table schema as a writer schema we need to validate that we'd be able to + // rewrite incoming batch's data (written in new schema) into it + (tableSchema, isSchemaCompatible(newSchema, tableSchema, true)) + } else { + // Picking new schema as a writer schema we need to validate that we'd be able to + // rewrite table's data into it + (newSchema, isSchemaCompatible(tableSchema, newSchema, true)) + } + } else { + val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(newSchema, AvroInternalSchemaConverter.convert(tableSchema)) + val evolvedSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, tableSchema.getFullName) + (evolvedSchema, isSchemaCompatible(evolvedSchema, tableSchema, true)) + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala index ccbd04a45b68a..d60362667182f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala @@ -19,7 +19,8 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.FileSystem import org.apache.hudi.HoodieConversionUtils.toJavaOption -import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, WriteOperationType} +import org.apache.hudi.common.config.HoodieCommonConfig +import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util import org.apache.hudi.config.HoodieWriteConfig @@ -29,9 +30,9 @@ import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.util.JFunction import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, ScalaAssertionSupport} import org.apache.spark.sql.hudi.HoodieSparkSessionExtension -import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.{HoodieUnsafeUtils, Row, SaveMode, SparkSession, SparkSessionExtensions, functions} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.{AfterEach, BeforeEach} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.CsvSource @@ -345,6 +346,191 @@ class TestBasicSchemaEvolution extends HoodieClientTestBase with ScalaAssertionS // TODO add test w/ overlapping updates } + + @ParameterizedTest + @CsvSource(value = Array( + "COPY_ON_WRITE,upsert,dynamic_schema_reconcile_strategy" + )) + def testDynamicSchemaReconcileStrategy(tableType: HoodieTableType, opType: String, reconcileStrategy: String): Unit = { + val opts = commonOpts ++ + Map( + DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name, + HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key -> "true", + HoodieCommonConfig.RECONCILE_SCHEMA_STRATEGY.key -> reconcileStrategy, + DataSourceWriteOptions.RECONCILE_SCHEMA.key() -> "true", + DataSourceWriteOptions.OPERATION.key -> opType + ) + + def appendData(schema: StructType, batch: Seq[Row], shouldAllowDroppedColumns: Boolean = false): Unit = { + HoodieUnsafeUtils.createDataFrameFromRows(spark, batch, schema) + .write + .format("org.apache.hudi") + .options(opts ++ Map(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> shouldAllowDroppedColumns.toString)) + .mode(SaveMode.Append) + .save(basePath) + } + + def loadTable(loadAllVersions: Boolean = true): (StructType, Seq[Row]) = { + val tableMetaClient = HoodieTableMetaClient.builder() + .setConf(spark.sparkContext.hadoopConfiguration) + .setBasePath(basePath) + .build() + + tableMetaClient.reloadActiveTimeline() + + val resolver = new TableSchemaResolver(tableMetaClient) + val latestTableSchema = AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableAvroSchema(false)) + + val tablePath = if (loadAllVersions) { + s"$basePath/*/*" + } else { + basePath + } + + val df = + spark.read.format("org.apache.hudi") + .load(tablePath) + .drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*) + .orderBy(functions.col("_row_key").cast(IntegerType)) + + (latestTableSchema, df.collectAsList().toSeq) + } + + // + // 1. Write 1st batch with schema A + // + + val firstSchema = StructType( + StructField("_row_key", StringType, nullable = true) :: + StructField("first_name", StringType, nullable = false) :: + StructField("last_name", StringType, nullable = true) :: + StructField("timestamp", IntegerType, nullable = true) :: + StructField("partition", IntegerType, nullable = true) :: Nil) + + val firstBatch = Seq( + Row("1", "Andy", "Cooper", 1, 1), + Row("2", "Lisi", "Wallace", 1, 1), + Row("3", "Zhangsan", "Shu", 1, 1)) + + HoodieUnsafeUtils.createDataFrameFromRows(spark, firstBatch, firstSchema) + .write + .format("org.apache.hudi") + .options(opts) + .mode(SaveMode.Overwrite) + .save(basePath) + + // + // 2. Write 2d batch with another schema (added column `age`) + // + + val secondSchema = StructType( + StructField("_row_key", StringType, nullable = true) :: + StructField("first_name", StringType, nullable = false) :: + StructField("last_name", StringType, nullable = true) :: + StructField("age", StringType, nullable = true) :: + StructField("timestamp", IntegerType, nullable = true) :: + StructField("partition", IntegerType, nullable = true) :: Nil) + + val secondBatch = Seq( + Row("4", "John", "Green", "10", 1, 1), + Row("5", "Jack", "Sparrow", "13", 1, 1), + Row("6", "Jill", "Fiorella", "12", 1, 1)) + + appendData(secondSchema, secondBatch) + val (tableSchemaAfterSecondBatch, rowsAfterSecondBatch) = loadTable() + + // NOTE: In case schema reconciliation is ENABLED, Hudi would prefer the new batch's schema (since it's adding a + // new column, compared w/ the table's one), therefore this case would be identical to reconciliation + // being DISABLED + // + // In case schema reconciliation is DISABLED, table will be overwritten in the batch's schema, + // entailing that the data in the added columns for table's existing records will be added w/ nulls, + // in case new column is nullable, and would fail otherwise + if (true) { + assertEquals(secondSchema, tableSchemaAfterSecondBatch) + + val ageColOrd = secondSchema.indexWhere(_.name == "age") + val expectedRows = injectColumnAt(firstBatch, ageColOrd, null) ++ secondBatch + + assertEquals(expectedRows, rowsAfterSecondBatch) + } + + // + // 3. Write 3d batch with another schema (w/ omitted a _nullable_ column `last_name`, expected to succeed since + // reconcile is enabled + // + val thirdSchema = StructType( + StructField("_row_key", StringType, nullable = true) :: + StructField("first_name", StringType, nullable = false) :: + StructField("age", StringType, nullable = true) :: + StructField("timestamp", IntegerType, nullable = true) :: + StructField("partition", IntegerType, nullable = true) :: Nil) + + val thirdBatch = Seq( + Row("7", "Harry", "15", 1, 1), + Row("8", "Ron", "14", 1, 1), + Row("9", "Germiona", "16", 1, 1)) + + appendData(thirdSchema, thirdBatch) + val (tableSchemaAfterThirdBatch, rowsAfterThirdBatch) = loadTable() + + // NOTE: In case schema reconciliation is ENABLED, Hudi would prefer the table's schema over the new batch + // schema (since we drop the column in the new batch), therefore table's schema after commit will actually + // stay the same, adding back (dropped) columns to the records in the batch (setting them as null). + // + // In case schema reconciliation is DISABLED, table will be overwritten in the batch's schema, + // entailing that the data in the dropped columns for table's existing records will be dropped. + assertEquals(secondSchema, tableSchemaAfterThirdBatch) + + val lastNameColOrd = firstSchema.indexWhere(_.name == "last_name") + val expectedRows = rowsAfterSecondBatch ++ injectColumnAt(thirdBatch, lastNameColOrd, null) + + assertEquals(expectedRows, rowsAfterThirdBatch) + + // + // 4. Write 4th batch by omitting 2 existing cols(last name, age) and adding 2 new cols (dob, city) + // + + val fourthSchema = StructType( + StructField("_row_key", StringType, nullable = true) :: + StructField("first_name", StringType, nullable = false) :: + StructField("timestamp", IntegerType, nullable = true) :: + StructField("partition", IntegerType, nullable = true) :: + StructField("dob", StringType, nullable = false) :: + StructField("city", StringType, nullable = false) :: Nil) + + val finalTableSchema = StructType( + StructField("_row_key", StringType, nullable = true) :: + StructField("first_name", StringType, nullable = false) :: + StructField("last_name", StringType, nullable = true) :: + StructField("age", StringType, nullable = true) :: + StructField("timestamp", IntegerType, nullable = true) :: + StructField("partition", IntegerType, nullable = true) :: + StructField("dob", StringType, nullable = true) :: + StructField("city", StringType, nullable = true) :: Nil) + + val fourthBatch = Seq( + Row("10", "Jon", 1, 1, "01/01/2000", "new_york"), + Row("11", "Jack", 1, 1, "01/01/2001", "san_francisco"), + Row("12", "Tim", 1, 1, "01/01/2002", "seattle")) + + appendData(fourthSchema, fourthBatch, shouldAllowDroppedColumns = true) + val (tableSchemaAfterFourthBatch, rowsAfterFourthBatch) = loadTable() + + assertEquals(finalTableSchema, tableSchemaAfterFourthBatch) + + // with dynamic schemas, dropped columns should be added and new columns should be added to table schema as well + val ageColOrd = secondSchema.indexWhere(_.name == "age") + val dobColOrd = finalTableSchema.indexWhere(_.name == "dob") + val cityColOrd = finalTableSchema.indexWhere(_.name == "city") + val fourthBatchWithLastName = injectColumnAt(fourthBatch, lastNameColOrd, null) + val fourthBatchWithLastNameAndAge = injectColumnAt(fourthBatchWithLastName, ageColOrd, null) + val rowsAfterThirdWithDob = injectColumnAt(rowsAfterThirdBatch, dobColOrd, null) + val rowsAfterThirdWithDobAndCity = injectColumnAt(rowsAfterThirdWithDob, cityColOrd, null) + val expectedRowsAfterFourthBatch = rowsAfterThirdWithDobAndCity ++ fourthBatchWithLastNameAndAge + + assertEquals(expectedRowsAfterFourthBatch, rowsAfterFourthBatch) + } } object TestBasicSchemaEvolution {