Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ public class HoodieCommonConfig extends HoodieConfig {
+ " operation will fail schema compatibility check. Set this option to true will make the newly added "
+ " column nullable to successfully complete the write operation.");

public static final ConfigProperty<Boolean> ADD_NULL_FOR_DELETED_COLUMNS = ConfigProperty
.key("hoodie.datasource.add.null.for.deleted.columns")
.defaultValue(false)
.markAdvanced()
.withDocumentation("When a non-nullable column is deleted in datasource during a write operation, the write "
+ " operation will fail schema compatibility check. Set this option to true will make the deleted "
+ " column be filled with null values to successfully complete the write operation.");

public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
.key("hoodie.common.spillable.diskmap.type")
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,8 @@ object DataSourceWriteOptions {

val RECONCILE_SCHEMA: ConfigProperty[java.lang.Boolean] = HoodieCommonConfig.RECONCILE_SCHEMA

val ADD_NULL_FOR_DELETED_COLUMNS: ConfigProperty[java.lang.Boolean] = HoodieCommonConfig.ADD_NULL_FOR_DELETED_COLUMNS

val MAKE_NEW_COLUMNS_NULLABLE: ConfigProperty[java.lang.Boolean] = HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE

val SPARK_SQL_INSERT_INTO_OPERATION: ConfigProperty[String] = ConfigProperty
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi

import org.apache.hudi.common.config.HoodieConfig
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.internal.schema.InternalSchema

/**
* Util methods for Schema evolution in Hudi
*/
object HoodieSchemaUtils {
/**
* get latest internalSchema from table
*
* @param config instance of {@link HoodieConfig}
* @param tableMetaClient instance of HoodieTableMetaClient
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableInternalSchema(config: HoodieConfig,
tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = {
if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
Option.empty[InternalSchema]
} else {
try {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
val internalSchemaOpt = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata
if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else None
} catch {
case _: Exception => None
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hudi.DataSourceUtils.tryOverrideParquetWriteLegacyFormatProper
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.AvroSchemaUtils.{canProject, isCompatibleProjectionOf, isSchemaCompatible, resolveNullableSchema}
import org.apache.hudi.avro.AvroSchemaUtils.{canProject, isCompatibleProjectionOf, isSchemaCompatible, isStrictProjectionOf, resolveNullableSchema}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.removeMetadataFields
import org.apache.hudi.client.common.HoodieSparkEngineContext
Expand Down Expand Up @@ -283,7 +283,7 @@ object HoodieSparkSqlWriter {
.getOrElse(getAvroRecordNameAndNamespace(tblName))

val sourceSchema = convertStructTypeToAvroSchema(df.schema, avroRecordName, avroRecordNamespace)
val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient).orElse {
val internalSchemaOpt = HoodieSchemaUtils.getLatestTableInternalSchema(hoodieConfig, tableMetaClient).orElse {
// In case we need to reconcile the schema and schema evolution is enabled,
// we will force-apply schema evolution to the writer's schema
if (shouldReconcileSchema && hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
Expand Down Expand Up @@ -317,7 +317,7 @@ object HoodieSparkSqlWriter {
}

// Create a HoodieWriteClient & issue the delete.
val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient)
val internalSchemaOpt = HoodieSchemaUtils.getLatestTableInternalSchema(hoodieConfig, tableMetaClient)
val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
null, path, tblName,
mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key)))
Expand Down Expand Up @@ -499,6 +499,7 @@ object HoodieSparkSqlWriter {
latestTableSchemaOpt: Option[Schema],
internalSchemaOpt: Option[InternalSchema],
opts: Map[String, String]): Schema = {
val addNullForDeletedColumns = opts(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key()).toBoolean
val shouldReconcileSchema = opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
Expand Down Expand Up @@ -579,7 +580,10 @@ object HoodieSparkSqlWriter {
if (!shouldValidateSchemasCompatibility) {
// if no validation is enabled, check for col drop
// if col drop is allowed, go ahead. if not, check for projection, so that we do not allow dropping cols
if (allowAutoEvolutionColumnDrop || canProject(latestTableSchema, canonicalizedSourceSchema)) {
if (addNullForDeletedColumns && isStrictProjectionOf(latestTableSchema, canonicalizedSourceSchema)
&& isSchemaCompatible(canonicalizedSourceSchema, latestTableSchema, true)) {
latestTableSchema
} else if (allowAutoEvolutionColumnDrop || canProject(latestTableSchema, canonicalizedSourceSchema)) {
canonicalizedSourceSchema
} else {
log.error(
Expand Down Expand Up @@ -717,29 +721,6 @@ object HoodieSparkSqlWriter {
reconcileNullability(sourceSchema, latestTableSchema, opts)
}


/**
* get latest internalSchema from table
*
* @param config instance of {@link HoodieConfig}
* @param tableMetaClient instance of HoodieTableMetaClient
* @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required.
*/
def getLatestTableInternalSchema(config: HoodieConfig,
tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = {
if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
Option.empty[InternalSchema]
} else {
try {
val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
val internalSchemaOpt = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata
if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else None
} catch {
case _: Exception => None
}
}
}

private def registerAvroSchemasWithKryo(sparkContext: SparkContext, targetAvroSchemas: Schema*): Unit = {
sparkContext.getConf.registerAvroSchemas(targetAvroSchemas: _*)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ object HoodieWriterUtils {
hoodieConfig.setDefaultValue(ASYNC_CLUSTERING_ENABLE)
hoodieConfig.setDefaultValue(ENABLE_ROW_WRITER)
hoodieConfig.setDefaultValue(RECONCILE_SCHEMA)
hoodieConfig.setDefaultValue(ADD_NULL_FOR_DELETED_COLUMNS)
hoodieConfig.setDefaultValue(MAKE_NEW_COLUMNS_NULLABLE)
hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS)
hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.hudi

import org.apache.avro.generic.GenericRecord
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ArrayType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
Expand Down Expand Up @@ -133,9 +133,9 @@ class TestHoodieSparkUtils {
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.getOrCreate

val innerStruct1 = new StructType().add("innerKey","string",false).add("innerValue", "long", true)
val innerStruct1 = new StructType().add("innerKey", "string", false).add("innerValue", "long", true)
val structType1 = new StructType().add("key", "string", false)
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct1,true)
.add("nonNullableInnerStruct", innerStruct1, false).add("nullableInnerStruct", innerStruct1, true)
val schema1 = AvroConversionUtils.convertStructTypeToAvroSchema(structType1, "test_struct_name", "test_namespace")
val records1 = Seq(Row("key1", Row("innerKey1_1", 1L), Row("innerKey1_2", 2L)))

Expand All @@ -146,8 +146,8 @@ class TestHoodieSparkUtils {

// create schema2 which has one addition column at the root level compared to schema1
val structType2 = new StructType().add("key", "string", false)
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct1,true)
.add("nullableInnerStruct2",innerStruct1,true)
.add("nonNullableInnerStruct", innerStruct1, false).add("nullableInnerStruct", innerStruct1, true)
.add("nullableInnerStruct2", innerStruct1, true)
val schema2 = AvroConversionUtils.convertStructTypeToAvroSchema(structType2, "test_struct_name", "test_namespace")
val records2 = Seq(Row("key2", Row("innerKey2_1", 2L), Row("innerKey2_2", 2L), Row("innerKey2_3", 2L)))
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(records2), structType2)
Expand All @@ -161,12 +161,12 @@ class TestHoodieSparkUtils {
assert(genRecRDD3.collect()(0).getSchema.equals(schema2))
genRecRDD3.foreach(entry => assertNull(entry.get("nullableInnerStruct2")))

val innerStruct3 = new StructType().add("innerKey","string",false).add("innerValue", "long", true)
.add("new_nested_col","string",true)
val innerStruct3 = new StructType().add("innerKey", "string", false).add("innerValue", "long", true)
.add("new_nested_col", "string", true)

// create a schema which has one additional nested column compared to schema1, which is nullable
val structType4 = new StructType().add("key", "string", false)
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct3,true)
.add("nonNullableInnerStruct", innerStruct1, false).add("nullableInnerStruct", innerStruct3, true)

val schema4 = AvroConversionUtils.convertStructTypeToAvroSchema(structType4, "test_struct_name", "test_namespace")
val records4 = Seq(Row("key2", Row("innerKey2_1", 2L), Row("innerKey2_2", 2L, "new_nested_col_val1")))
Expand All @@ -180,16 +180,16 @@ class TestHoodieSparkUtils {
org.apache.hudi.common.util.Option.of(schema4))
assert(schema4.equals(genRecRDD4.collect()(0).getSchema))
val genRec = genRecRDD5.collect()(0)
val nestedRec : GenericRecord = genRec.get("nullableInnerStruct").asInstanceOf[GenericRecord]
val nestedRec: GenericRecord = genRec.get("nullableInnerStruct").asInstanceOf[GenericRecord]
assertNull(nestedRec.get("new_nested_col"))
assertNotNull(nestedRec.get("innerKey"))
assertNotNull(nestedRec.get("innerValue"))

val innerStruct4 = new StructType().add("innerKey","string",false).add("innerValue", "long", true)
.add("new_nested_col","string",false)
val innerStruct4 = new StructType().add("innerKey", "string", false).add("innerValue", "long", true)
.add("new_nested_col", "string", false)
// create a schema which has one additional nested column compared to schema1, which is non nullable
val structType6 = new StructType().add("key", "string", false)
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct4,true)
.add("nonNullableInnerStruct", innerStruct1, false).add("nullableInnerStruct", innerStruct4, true)

val schema6 = AvroConversionUtils.convertStructTypeToAvroSchema(structType6, "test_struct_name", "test_namespace")
// convert batch 1 with schema5. should fail since the missed out column is not nullable.
Expand All @@ -212,3 +212,28 @@ class TestHoodieSparkUtils {
def convertRowListToSeq(inputList: java.util.List[Row]): Seq[Row] =
JavaConverters.asScalaIteratorConverter(inputList.iterator).asScala.toSeq
}

object TestHoodieSparkUtils {


def setNullableRec(structType: StructType, columnName: Array[String], index: Int): StructType = {
StructType(structType.map {
case StructField(name, StructType(fields), nullable, metadata) if name.equals(columnName(index)) =>
StructField(name, setNullableRec(StructType(fields), columnName, index + 1), nullable, metadata)
case StructField(name, ArrayType(StructType(fields), _), nullable, metadata) if name.equals(columnName(index)) =>
StructField(name, ArrayType(setNullableRec(StructType(fields), columnName, index + 1)), nullable, metadata)
case StructField(name, dataType, _, metadata) if name.equals(columnName(index)) =>
StructField(name, dataType, nullable = false, metadata)
case y: StructField => y
})
}

def setColumnNotNullable(df: DataFrame, columnName: String): DataFrame = {
// get schema
val schema = df.schema
// modify [[StructField] with name `cn`
val newSchema = setNullableRec(schema, columnName.split('.'), 0)
// apply new schema
df.sqlContext.createDataFrame(df.rdd, newSchema)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@

import static java.lang.String.format;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.utilities.UtilHelpers.buildProperties;
import static org.apache.hudi.utilities.UtilHelpers.readConfig;

/**
* An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target
Expand Down Expand Up @@ -170,7 +172,7 @@ private static TypedProperties combineProperties(Config cfg, Option<TypedPropert
} else if (cfg.propsFilePath.equals(Config.DEFAULT_DFS_SOURCE_PROPERTIES)) {
hoodieConfig.setAll(UtilHelpers.getConfig(cfg.configs).getProps());
} else {
hoodieConfig.setAll(UtilHelpers.readConfig(hadoopConf, new Path(cfg.propsFilePath), cfg.configs).getProps());
hoodieConfig.setAll(readConfig(hadoopConf, new Path(cfg.propsFilePath), cfg.configs).getProps());
}

// set any configs that Deltastreamer has to override explicitly
Expand Down Expand Up @@ -429,6 +431,12 @@ public boolean isInlineCompactionEnabled() {
&& HoodieTableType.MERGE_ON_READ.equals(HoodieTableType.valueOf(tableType));
}

public static TypedProperties getProps(FileSystem fs, Config cfg) {
return cfg.propsFilePath.isEmpty()
? buildProperties(cfg.configs)
: readConfig(fs.getConf(), new Path(cfg.propsFilePath), cfg.configs).getProps();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Loading