Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ public class HoodieCommonConfig extends HoodieConfig {
.defaultValue(true)
.withDocumentation("Turn on compression for BITCASK disk map used by the External Spillable Map");

public static final String LEGACY_RECONCILE_STRATEGY = "legacy_reconcile_strategy";
public static final String DYNAMIC_SCHEMA_RECONCILE_STRATEGY = "dynamic_schema_reconcile_strategy";

public static final ConfigProperty<String> RECONCILE_SCHEMA_STRATEGY = ConfigProperty
.key("hoodie.datasource.write.reconcile.schema.strategy")
.defaultValue(LEGACY_RECONCILE_STRATEGY)
.withDocumentation("To be fixed");

public ExternalSpillableMap.DiskMapType getSpillableDiskMapType() {
return ExternalSpillableMap.DiskMapType.valueOf(getString(SPILLABLE_DISK_MAP_TYPE).toUpperCase(Locale.ROOT));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,20 +415,19 @@ object HoodieSparkSqlWriter {

val allowAutoEvolutionColumnDrop = opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key,
HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean
val reconcileSchemaStrategy = opts.getOrDefault(HoodieCommonConfig.RECONCILE_SCHEMA_STRATEGY.key(),
HoodieCommonConfig.RECONCILE_SCHEMA_STRATEGY.defaultValue())

if (shouldReconcileSchema) {
internalSchemaOpt match {
case Some(internalSchema) =>
// Apply schema evolution, by auto-merging write schema and read schema
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, internalSchema)
val evolvedSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getFullName)
val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields().filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
if (shouldRemoveMetaDataFromInternalSchema) HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema
SchemaReconcileUtils.reconcileSchema(sourceSchema, canonicalizedSourceSchema, internalSchema, latestTableSchema.getFullName)
case None =>
// In case schema reconciliation is enabled we will employ (legacy) reconciliation
// strategy to produce target writer's schema (see definition below)
val (reconciledSchema, isCompatible) =
reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema)
SchemaReconcileUtils.reconcileSchemas(latestTableSchema, canonicalizedSourceSchema, reconcileSchemaStrategy)

// NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible
// w/ the table's one and allow schemas to diverge. This is required in cases where
Expand Down Expand Up @@ -540,31 +539,6 @@ object HoodieSparkSqlWriter {
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key() -> schemaValidateEnable)
}

private def reconcileSchemasLegacy(tableSchema: Schema, newSchema: Schema): (Schema, Boolean) = {
// Legacy reconciliation implements following semantic
// - In case new-schema is a "compatible" projection of the existing table's one (projection allowing
// permitted type promotions), table's schema would be picked as (reconciled) writer's schema;
// - Otherwise, we'd fall back to picking new (batch's) schema as a writer's schema;
//
// Philosophically, such semantic aims at always choosing a "wider" schema, ie the one containing
// the other one (schema A contains schema B, if schema B is a projection of A). This enables us,
// to always "extend" the schema during schema evolution and hence never lose the data (when, for ex
// existing column is being dropped in a new batch)
//
// NOTE: By default Hudi doesn't allow automatic schema evolution to drop the columns from the target
// table. However, when schema reconciliation is turned on, we would allow columns to be dropped
// in the incoming batch (as these would be reconciled in anyway)
if (isCompatibleProjectionOf(tableSchema, newSchema)) {
// Picking table schema as a writer schema we need to validate that we'd be able to
// rewrite incoming batch's data (written in new schema) into it
(tableSchema, isSchemaCompatible(newSchema, tableSchema, true))
} else {
// Picking new schema as a writer schema we need to validate that we'd be able to
// rewrite table's data into it
(newSchema, isSchemaCompatible(tableSchema, newSchema, true))
}
}

/**
* get latest internalSchema from table
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi

import org.apache.avro.Schema
import org.apache.hudi.avro.AvroSchemaUtils.{isCompatibleProjectionOf, isSchemaCompatible}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.HoodieCommonConfig
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils

import scala.collection.JavaConversions.asScalaBuffer

object SchemaReconcileUtils {

def reconcileSchema(sourceSchema: Schema, canonicalizedSourceSchema: Schema, internalSchema: InternalSchema, tableSchemaName: String) : Schema = {
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, internalSchema)
val evolvedSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, tableSchemaName)
val shouldRemoveMetaDataFromInternalSchema = sourceSchema.getFields().filter(f => f.name().equalsIgnoreCase(HoodieRecord.RECORD_KEY_METADATA_FIELD)).isEmpty
if (shouldRemoveMetaDataFromInternalSchema) HoodieAvroUtils.removeMetadataFields(evolvedSchema) else evolvedSchema
}

def reconcileSchemas(tableSchema: Schema, newSchema: Schema, reconcileStrategy: String): (Schema, Boolean) = {
if (reconcileStrategy.equalsIgnoreCase(HoodieCommonConfig.LEGACY_RECONCILE_STRATEGY)) {
// Legacy reconciliation implements following semantic
// - In case new-schema is a "compatible" projection of the existing table's one (projection allowing
// permitted type promotions), table's schema would be picked as (reconciled) writer's schema;
// - Otherwise, we'd fall back to picking new (batch's) schema as a writer's schema;
//
// Philosophically, such semantic aims at always choosing a "wider" schema, ie the one containing
// the other one (schema A contains schema B, if schema B is a projection of A). This enables us,
// to always "extend" the schema during schema evolution and hence never lose the data (when, for ex
// existing column is being dropped in a new batch)
//
// NOTE: By default Hudi doesn't allow automatic schema evolution to drop the columns from the target
// table. However, when schema reconciliation is turned on, we would allow columns to be dropped
// in the incoming batch (as these would be reconciled in anyway)
if (isCompatibleProjectionOf(tableSchema, newSchema)) {
// Picking table schema as a writer schema we need to validate that we'd be able to
// rewrite incoming batch's data (written in new schema) into it
(tableSchema, isSchemaCompatible(newSchema, tableSchema, true))
} else {
// Picking new schema as a writer schema we need to validate that we'd be able to
// rewrite table's data into it
(newSchema, isSchemaCompatible(tableSchema, newSchema, true))
}
} else {
val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(newSchema, AvroInternalSchemaConverter.convert(tableSchema))
val evolvedSchema = AvroInternalSchemaConverter.convert(mergedInternalSchema, tableSchema.getFullName)
(evolvedSchema, isSchemaCompatible(evolvedSchema, tableSchema, true))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.hudi.functional

import org.apache.hadoop.fs.FileSystem
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, WriteOperationType}
import org.apache.hudi.common.config.HoodieCommonConfig
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util
import org.apache.hudi.config.HoodieWriteConfig
Expand All @@ -29,9 +30,9 @@ import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, ScalaAssertionSupport}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{HoodieUnsafeUtils, Row, SaveMode, SparkSession, SparkSessionExtensions, functions}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
Expand Down Expand Up @@ -345,6 +346,191 @@ class TestBasicSchemaEvolution extends HoodieClientTestBase with ScalaAssertionS

// TODO add test w/ overlapping updates
}

@ParameterizedTest
@CsvSource(value = Array(
"COPY_ON_WRITE,upsert,dynamic_schema_reconcile_strategy"
))
def testDynamicSchemaReconcileStrategy(tableType: HoodieTableType, opType: String, reconcileStrategy: String): Unit = {
val opts = commonOpts ++
Map(
DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name,
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key -> "true",
HoodieCommonConfig.RECONCILE_SCHEMA_STRATEGY.key -> reconcileStrategy,
DataSourceWriteOptions.RECONCILE_SCHEMA.key() -> "true",
DataSourceWriteOptions.OPERATION.key -> opType
)

def appendData(schema: StructType, batch: Seq[Row], shouldAllowDroppedColumns: Boolean = false): Unit = {
HoodieUnsafeUtils.createDataFrameFromRows(spark, batch, schema)
.write
.format("org.apache.hudi")
.options(opts ++ Map(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> shouldAllowDroppedColumns.toString))
.mode(SaveMode.Append)
.save(basePath)
}

def loadTable(loadAllVersions: Boolean = true): (StructType, Seq[Row]) = {
val tableMetaClient = HoodieTableMetaClient.builder()
.setConf(spark.sparkContext.hadoopConfiguration)
.setBasePath(basePath)
.build()

tableMetaClient.reloadActiveTimeline()

val resolver = new TableSchemaResolver(tableMetaClient)
val latestTableSchema = AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableAvroSchema(false))

val tablePath = if (loadAllVersions) {
s"$basePath/*/*"
} else {
basePath
}

val df =
spark.read.format("org.apache.hudi")
.load(tablePath)
.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala: _*)
.orderBy(functions.col("_row_key").cast(IntegerType))

(latestTableSchema, df.collectAsList().toSeq)
}

//
// 1. Write 1st batch with schema A
//

val firstSchema = StructType(
StructField("_row_key", StringType, nullable = true) ::
StructField("first_name", StringType, nullable = false) ::
StructField("last_name", StringType, nullable = true) ::
StructField("timestamp", IntegerType, nullable = true) ::
StructField("partition", IntegerType, nullable = true) :: Nil)

val firstBatch = Seq(
Row("1", "Andy", "Cooper", 1, 1),
Row("2", "Lisi", "Wallace", 1, 1),
Row("3", "Zhangsan", "Shu", 1, 1))

HoodieUnsafeUtils.createDataFrameFromRows(spark, firstBatch, firstSchema)
.write
.format("org.apache.hudi")
.options(opts)
.mode(SaveMode.Overwrite)
.save(basePath)

//
// 2. Write 2d batch with another schema (added column `age`)
//

val secondSchema = StructType(
StructField("_row_key", StringType, nullable = true) ::
StructField("first_name", StringType, nullable = false) ::
StructField("last_name", StringType, nullable = true) ::
StructField("age", StringType, nullable = true) ::
StructField("timestamp", IntegerType, nullable = true) ::
StructField("partition", IntegerType, nullable = true) :: Nil)

val secondBatch = Seq(
Row("4", "John", "Green", "10", 1, 1),
Row("5", "Jack", "Sparrow", "13", 1, 1),
Row("6", "Jill", "Fiorella", "12", 1, 1))

appendData(secondSchema, secondBatch)
val (tableSchemaAfterSecondBatch, rowsAfterSecondBatch) = loadTable()

// NOTE: In case schema reconciliation is ENABLED, Hudi would prefer the new batch's schema (since it's adding a
// new column, compared w/ the table's one), therefore this case would be identical to reconciliation
// being DISABLED
//
// In case schema reconciliation is DISABLED, table will be overwritten in the batch's schema,
// entailing that the data in the added columns for table's existing records will be added w/ nulls,
// in case new column is nullable, and would fail otherwise
if (true) {
assertEquals(secondSchema, tableSchemaAfterSecondBatch)

val ageColOrd = secondSchema.indexWhere(_.name == "age")
val expectedRows = injectColumnAt(firstBatch, ageColOrd, null) ++ secondBatch

assertEquals(expectedRows, rowsAfterSecondBatch)
}

//
// 3. Write 3d batch with another schema (w/ omitted a _nullable_ column `last_name`, expected to succeed since
// reconcile is enabled
//
val thirdSchema = StructType(
StructField("_row_key", StringType, nullable = true) ::
StructField("first_name", StringType, nullable = false) ::
StructField("age", StringType, nullable = true) ::
StructField("timestamp", IntegerType, nullable = true) ::
StructField("partition", IntegerType, nullable = true) :: Nil)

val thirdBatch = Seq(
Row("7", "Harry", "15", 1, 1),
Row("8", "Ron", "14", 1, 1),
Row("9", "Germiona", "16", 1, 1))

appendData(thirdSchema, thirdBatch)
val (tableSchemaAfterThirdBatch, rowsAfterThirdBatch) = loadTable()

// NOTE: In case schema reconciliation is ENABLED, Hudi would prefer the table's schema over the new batch
// schema (since we drop the column in the new batch), therefore table's schema after commit will actually
// stay the same, adding back (dropped) columns to the records in the batch (setting them as null).
//
// In case schema reconciliation is DISABLED, table will be overwritten in the batch's schema,
// entailing that the data in the dropped columns for table's existing records will be dropped.
assertEquals(secondSchema, tableSchemaAfterThirdBatch)

val lastNameColOrd = firstSchema.indexWhere(_.name == "last_name")
val expectedRows = rowsAfterSecondBatch ++ injectColumnAt(thirdBatch, lastNameColOrd, null)

assertEquals(expectedRows, rowsAfterThirdBatch)

//
// 4. Write 4th batch by omitting 2 existing cols(last name, age) and adding 2 new cols (dob, city)
//

val fourthSchema = StructType(
StructField("_row_key", StringType, nullable = true) ::
StructField("first_name", StringType, nullable = false) ::
StructField("timestamp", IntegerType, nullable = true) ::
StructField("partition", IntegerType, nullable = true) ::
StructField("dob", StringType, nullable = false) ::
StructField("city", StringType, nullable = false) :: Nil)

val finalTableSchema = StructType(
StructField("_row_key", StringType, nullable = true) ::
StructField("first_name", StringType, nullable = false) ::
StructField("last_name", StringType, nullable = true) ::
StructField("age", StringType, nullable = true) ::
StructField("timestamp", IntegerType, nullable = true) ::
StructField("partition", IntegerType, nullable = true) ::
StructField("dob", StringType, nullable = true) ::
StructField("city", StringType, nullable = true) :: Nil)

val fourthBatch = Seq(
Row("10", "Jon", 1, 1, "01/01/2000", "new_york"),
Row("11", "Jack", 1, 1, "01/01/2001", "san_francisco"),
Row("12", "Tim", 1, 1, "01/01/2002", "seattle"))

appendData(fourthSchema, fourthBatch, shouldAllowDroppedColumns = true)
val (tableSchemaAfterFourthBatch, rowsAfterFourthBatch) = loadTable()

assertEquals(finalTableSchema, tableSchemaAfterFourthBatch)

// with dynamic schemas, dropped columns should be added and new columns should be added to table schema as well
val ageColOrd = secondSchema.indexWhere(_.name == "age")
val dobColOrd = finalTableSchema.indexWhere(_.name == "dob")
val cityColOrd = finalTableSchema.indexWhere(_.name == "city")
val fourthBatchWithLastName = injectColumnAt(fourthBatch, lastNameColOrd, null)
val fourthBatchWithLastNameAndAge = injectColumnAt(fourthBatchWithLastName, ageColOrd, null)
val rowsAfterThirdWithDob = injectColumnAt(rowsAfterThirdBatch, dobColOrd, null)
val rowsAfterThirdWithDobAndCity = injectColumnAt(rowsAfterThirdWithDob, cityColOrd, null)
val expectedRowsAfterFourthBatch = rowsAfterThirdWithDobAndCity ++ fourthBatchWithLastNameAndAge

assertEquals(expectedRowsAfterFourthBatch, rowsAfterFourthBatch)
}
}

object TestBasicSchemaEvolution {
Expand Down