diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java index 8a1dbf04b3b58..1a4c2e317807f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java @@ -72,15 +72,6 @@ public class HoodieCommonConfig extends HoodieConfig { + "This enables us, to always extend the table's schema during evolution and never lose the data (when, for " + "ex, existing column is being dropped in a new batch)"); - public static final ConfigProperty MAKE_NEW_COLUMNS_NULLABLE = ConfigProperty - .key("hoodie.datasource.write.new.columns.nullable") - .defaultValue(false) - .markAdvanced() - .sinceVersion("0.14.0") - .withDocumentation("When a non-nullable column is added to datasource during a write operation, the write " - + " operation will fail schema compatibility check. Set this option to true will make the newly added " - + " column nullable to successfully complete the write operation."); - public static final ConfigProperty SET_NULL_FOR_MISSING_COLUMNS = ConfigProperty .key("hoodie.write.set.null.for.missing.columns") .defaultValue("false") diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java index e714d99f0e0ec..cf7e87f457b80 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java @@ -25,11 +25,9 @@ import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.TreeMap; import java.util.stream.Collectors; -import static org.apache.hudi.common.config.HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE; import static org.apache.hudi.common.util.CollectionUtils.reduce; import static org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter.convert; @@ -136,10 +134,9 @@ public static Schema reconcileSchema(Schema incomingSchema, Schema oldTableSchem * * @param sourceSchema source schema that needs reconciliation * @param targetSchema target schema that source schema will be reconciled against - * @param opts config options * @return schema (based off {@code source} one) that has nullability constraints and datatypes reconciled */ - public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema targetSchema, Map opts) { + public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema targetSchema) { if (targetSchema.getType() == Schema.Type.NULL || targetSchema.getFields().isEmpty()) { return sourceSchema; } @@ -153,14 +150,12 @@ public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema tar List colNamesSourceSchema = sourceInternalSchema.getAllColsFullName(); List colNamesTargetSchema = targetInternalSchema.getAllColsFullName(); - boolean makeNewColsNullable = "true".equals(opts.get(MAKE_NEW_COLUMNS_NULLABLE.key())); List nullableUpdateColsInSource = new ArrayList<>(); List typeUpdateColsInSource = new ArrayList<>(); colNamesSourceSchema.forEach(field -> { // handle columns that needs to be made nullable - if ((makeNewColsNullable && !colNamesTargetSchema.contains(field)) - || colNamesTargetSchema.contains(field) && sourceInternalSchema.findField(field).isOptional() != targetInternalSchema.findField(field).isOptional()) { + if (colNamesTargetSchema.contains(field) && sourceInternalSchema.findField(field).isOptional() != targetInternalSchema.findField(field).isOptional()) { nullableUpdateColsInSource.add(field); } // handle columns that needs type to be updated diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 47a7c61a60fa2..f93209c983674 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -565,8 +565,6 @@ object DataSourceWriteOptions { val SET_NULL_FOR_MISSING_COLUMNS: ConfigProperty[String] = HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS - val MAKE_NEW_COLUMNS_NULLABLE: ConfigProperty[java.lang.Boolean] = HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE - val SPARK_SQL_INSERT_INTO_OPERATION: ConfigProperty[String] = ConfigProperty .key("hoodie.spark.sql.insert.into.operation") .defaultValue(WriteOperationType.INSERT.value()) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala index ed073ce4b1747..0b42dc75b5417 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala @@ -206,7 +206,7 @@ object HoodieSchemaUtils { * TODO support casing reconciliation */ private def canonicalizeSchema(sourceSchema: Schema, latestTableSchema: Schema, opts : Map[String, String]): Schema = { - reconcileSchemaRequirements(sourceSchema, latestTableSchema, opts) + reconcileSchemaRequirements(sourceSchema, latestTableSchema) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 133f641d280bc..63495b0eede67 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -83,7 +83,6 @@ object HoodieWriterUtils { hoodieConfig.setDefaultValue(ASYNC_CLUSTERING_ENABLE) hoodieConfig.setDefaultValue(ENABLE_ROW_WRITER) hoodieConfig.setDefaultValue(RECONCILE_SCHEMA) - hoodieConfig.setDefaultValue(MAKE_NEW_COLUMNS_NULLABLE) hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS) hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED) Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index cb0209de979cc..a28a228fd4683 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -24,7 +24,7 @@ import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD} -import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} +import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType} import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline, TimelineUtils} @@ -38,7 +38,6 @@ import org.apache.hudi.exception.ExceptionUtil.getRootCause import org.apache.hudi.exception.HoodieException import org.apache.hudi.functional.CommonOptionUtils._ import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable -import org.apache.hudi.hive.HiveSyncConfigHolder import org.apache.hudi.keygen._ import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.metrics.{Metrics, MetricsReporterType} @@ -1749,50 +1748,6 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup assertEquals(0, result.filter(result("id") === 1).count()) } - /** Test case to verify MAKE_NEW_COLUMNS_NULLABLE config parameter. */ - @Test - def testSchemaEvolutionWithNewColumn(): Unit = { - val df1 = spark.sql("select '1' as event_id, '2' as ts, '3' as version, 'foo' as event_date") - var hudiOptions = Map[String, String]( - HoodieWriteConfig.TBL_NAME.key() -> "test_hudi_merger", - KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() -> "event_id", - KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> "version", - DataSourceWriteOptions.OPERATION.key() -> "insert", - HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key() -> "ts", - HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() -> "org.apache.hudi.keygen.ComplexKeyGenerator", - KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key() -> "true", - HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key() -> "false", - HoodieWriteConfig.RECORD_MERGER_IMPLS.key() -> "org.apache.hudi.HoodieSparkRecordMerger" - ) - df1.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(basePath) - - // Try adding a string column. This operation is expected to throw 'schema not compatible' exception since - // 'MAKE_NEW_COLUMNS_NULLABLE' parameter is 'false' by default. - val df2 = spark.sql("select '2' as event_id, '2' as ts, '3' as version, 'foo' as event_date, 'bar' as add_col") - try { - (df2.write.format("hudi").options(hudiOptions).mode("append").save(basePath)) - fail("Option succeeded, but was expected to fail.") - } catch { - case ex: org.apache.hudi.exception.HoodieInsertException => { - assertTrue(ex.getMessage.equals("Failed insert schema compatibility check")) - } - case ex: Exception => { - fail(ex) - } - } - - // Try adding the string column again. This operation is expected to succeed since 'MAKE_NEW_COLUMNS_NULLABLE' - // parameter has been set to 'true'. - hudiOptions = hudiOptions + (HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE.key() -> "true") - try { - (df2.write.format("hudi").options(hudiOptions).mode("append").save(basePath)) - } catch { - case ex: Exception => { - fail(ex) - } - } - } - def assertLastCommitIsUpsert(): Boolean = { val metaClient = HoodieTableMetaClient.builder() .setBasePath(basePath)