Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,6 @@ public class HoodieCommonConfig extends HoodieConfig {
+ "This enables us, to always extend the table's schema during evolution and never lose the data (when, for "
+ "ex, existing column is being dropped in a new batch)");

public static final ConfigProperty<Boolean> MAKE_NEW_COLUMNS_NULLABLE = ConfigProperty
.key("hoodie.datasource.write.new.columns.nullable")
.defaultValue(false)
.markAdvanced()
.sinceVersion("0.14.0")
.withDocumentation("When a non-nullable column is added to datasource during a write operation, the write "
+ " operation will fail schema compatibility check. Set this option to true will make the newly added "
+ " column nullable to successfully complete the write operation.");

public static final ConfigProperty<String> SET_NULL_FOR_MISSING_COLUMNS = ConfigProperty
.key("hoodie.write.set.null.for.missing.columns")
.defaultValue("false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;

import static org.apache.hudi.common.config.HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE;
import static org.apache.hudi.common.util.CollectionUtils.reduce;
import static org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter.convert;

Expand Down Expand Up @@ -136,10 +134,9 @@ public static Schema reconcileSchema(Schema incomingSchema, Schema oldTableSchem
*
* @param sourceSchema source schema that needs reconciliation
* @param targetSchema target schema that source schema will be reconciled against
* @param opts config options
* @return schema (based off {@code source} one) that has nullability constraints and datatypes reconciled
*/
public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema targetSchema, Map<String, String> opts) {
public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema targetSchema) {
if (targetSchema.getType() == Schema.Type.NULL || targetSchema.getFields().isEmpty()) {
return sourceSchema;
}
Expand All @@ -153,14 +150,12 @@ public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema tar

List<String> colNamesSourceSchema = sourceInternalSchema.getAllColsFullName();
List<String> colNamesTargetSchema = targetInternalSchema.getAllColsFullName();
boolean makeNewColsNullable = "true".equals(opts.get(MAKE_NEW_COLUMNS_NULLABLE.key()));

List<String> nullableUpdateColsInSource = new ArrayList<>();
List<String> typeUpdateColsInSource = new ArrayList<>();
colNamesSourceSchema.forEach(field -> {
// handle columns that needs to be made nullable
if ((makeNewColsNullable && !colNamesTargetSchema.contains(field))
|| colNamesTargetSchema.contains(field) && sourceInternalSchema.findField(field).isOptional() != targetInternalSchema.findField(field).isOptional()) {
if (colNamesTargetSchema.contains(field) && sourceInternalSchema.findField(field).isOptional() != targetInternalSchema.findField(field).isOptional()) {
nullableUpdateColsInSource.add(field);
}
// handle columns that needs type to be updated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,8 +565,6 @@ object DataSourceWriteOptions {

val SET_NULL_FOR_MISSING_COLUMNS: ConfigProperty[String] = HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS

val MAKE_NEW_COLUMNS_NULLABLE: ConfigProperty[java.lang.Boolean] = HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE

val SPARK_SQL_INSERT_INTO_OPERATION: ConfigProperty[String] = ConfigProperty
.key("hoodie.spark.sql.insert.into.operation")
.defaultValue(WriteOperationType.INSERT.value())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ object HoodieSchemaUtils {
* TODO support casing reconciliation
*/
private def canonicalizeSchema(sourceSchema: Schema, latestTableSchema: Schema, opts : Map[String, String]): Schema = {
reconcileSchemaRequirements(sourceSchema, latestTableSchema, opts)
reconcileSchemaRequirements(sourceSchema, latestTableSchema)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ object HoodieWriterUtils {
hoodieConfig.setDefaultValue(ASYNC_CLUSTERING_ENABLE)
hoodieConfig.setDefaultValue(ENABLE_ROW_WRITER)
hoodieConfig.setDefaultValue(RECONCILE_SCHEMA)
hoodieConfig.setDefaultValue(MAKE_NEW_COLUMNS_NULLABLE)
hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS)
hoodieConfig.setDefaultValue(KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED)
Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline, TimelineUtils}
Expand All @@ -38,7 +38,6 @@ import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.functional.CommonOptionUtils._
import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.metrics.{Metrics, MetricsReporterType}
Expand Down Expand Up @@ -1749,50 +1748,6 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
assertEquals(0, result.filter(result("id") === 1).count())
}

/** Test case to verify MAKE_NEW_COLUMNS_NULLABLE config parameter. */
@Test
def testSchemaEvolutionWithNewColumn(): Unit = {
val df1 = spark.sql("select '1' as event_id, '2' as ts, '3' as version, 'foo' as event_date")
var hudiOptions = Map[String, String](
HoodieWriteConfig.TBL_NAME.key() -> "test_hudi_merger",
KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key() -> "event_id",
KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key() -> "version",
DataSourceWriteOptions.OPERATION.key() -> "insert",
HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key() -> "ts",
HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key() -> "org.apache.hudi.keygen.ComplexKeyGenerator",
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key() -> "true",
HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key() -> "false",
HoodieWriteConfig.RECORD_MERGER_IMPLS.key() -> "org.apache.hudi.HoodieSparkRecordMerger"
)
df1.write.format("hudi").options(hudiOptions).mode(SaveMode.Append).save(basePath)

// Try adding a string column. This operation is expected to throw 'schema not compatible' exception since
// 'MAKE_NEW_COLUMNS_NULLABLE' parameter is 'false' by default.
val df2 = spark.sql("select '2' as event_id, '2' as ts, '3' as version, 'foo' as event_date, 'bar' as add_col")
try {
(df2.write.format("hudi").options(hudiOptions).mode("append").save(basePath))
fail("Option succeeded, but was expected to fail.")
} catch {
case ex: org.apache.hudi.exception.HoodieInsertException => {
assertTrue(ex.getMessage.equals("Failed insert schema compatibility check"))
}
case ex: Exception => {
fail(ex)
}
}

// Try adding the string column again. This operation is expected to succeed since 'MAKE_NEW_COLUMNS_NULLABLE'
// parameter has been set to 'true'.
hudiOptions = hudiOptions + (HoodieCommonConfig.MAKE_NEW_COLUMNS_NULLABLE.key() -> "true")
try {
(df2.write.format("hudi").options(hudiOptions).mode("append").save(basePath))
} catch {
case ex: Exception => {
fail(ex)
}
}
}

def assertLastCommitIsUpsert(): Boolean = {
val metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
Expand Down