Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.client.utils.SparkRowSerDe;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.log4j.LogManager;
Expand Down Expand Up @@ -480,6 +481,7 @@ private Object[] getNestedFieldValues(InternalRow row, HoodieUnsafeRowUtils.Nest
private HoodieUnsafeRowUtils.NestedFieldPath[] resolveNestedFieldPaths(List<String> fieldPaths, StructType schema, boolean returnNull) {
try {
return fieldPaths.stream()
.filter(fieldPath -> !StringUtils.isNullOrEmpty(fieldPath))
.map(fieldPath -> HoodieUnsafeRowUtils$.MODULE$.composeNestedFieldPath(schema, fieldPath))
.toArray(HoodieUnsafeRowUtils.NestedFieldPath[]::new);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public SimpleKeyGenerator(TypedProperties props) {
SimpleKeyGenerator(TypedProperties props, String recordKeyField, String partitionPathField) {
super(props);
// Make sure key-generator is configured properly
validateRecordKey(recordKeyField);
validateRecordKey(recordKeyField, autoGenerateRecordKeys);
validatePartitionPath(partitionPathField);

this.recordKeyFields = recordKeyField == null ? Collections.emptyList() : Collections.singletonList(recordKeyField);
Expand Down Expand Up @@ -126,8 +126,8 @@ private static void validatePartitionPath(String partitionPathField) {
String.format("Single partition-path field is expected; provided (%s)", partitionPathField));
}

private static void validateRecordKey(String recordKeyField) {
checkArgument(recordKeyField == null || !recordKeyField.isEmpty(),
private static void validateRecordKey(String recordKeyField, boolean isAutoGenerateRecordKeyEnabled) {
checkArgument(recordKeyField == null || !recordKeyField.isEmpty() || isAutoGenerateRecordKeyEnabled,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the validation on record key field be:

  1. It is not empty.
  2. Or, isAutoGenerateRecordKeyEnabled.

I am assuming record key field can be null only when isAutoGenerateRecordKeyEnabled is true. We can take it as a followup after confirming why null record key field was allowed in the first place.

"Record key field has to be non-empty!");
checkArgument(recordKeyField == null || !recordKeyField.contains(FIELDS_SEP),
String.format("Single record-key field is expected; provided (%s)", recordKeyField));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,12 @@ public String getPreCombineField() {
}

public Option<String[]> getRecordKeyFields() {
String keyFieldsValue = getStringOrDefault(RECORDKEY_FIELDS, HoodieRecord.RECORD_KEY_METADATA_FIELD);
return Option.of(Arrays.stream(keyFieldsValue.split(","))
return getRecordKeyFields(HoodieRecord.RECORD_KEY_METADATA_FIELD);
}

public Option<String[]> getRecordKeyFields(String defaultValue) {
String keyFieldsValue = getStringOrDefault(RECORDKEY_FIELDS, defaultValue);
return keyFieldsValue == null ? Option.empty() : Option.of(Arrays.stream(keyFieldsValue.split(","))
.filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[] {}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ object DataSourceWriteOptions {
* Key generator class, that implements will extract the key out of incoming record.
*/
val keyGeneratorInferFunc = JFunction.toJavaFunction((config: HoodieConfig) => {
Option.of(DataSourceOptionsHelper.inferKeyGenClazz(config.getProps))
Option.ofNullable(DataSourceOptionsHelper.inferKeyGenClazz(config.getProps))
})

val KEYGENERATOR_CLASS_NAME: ConfigProperty[String] = ConfigProperty
Expand Down Expand Up @@ -862,6 +862,10 @@ object DataSourceOptionsHelper {
def inferKeyGenClazz(recordsKeyFields: String, partitionFields: String): String = {
if (!StringUtils.isNullOrEmpty(partitionFields)) {
val numPartFields = partitionFields.split(",").length
// Inference may not work when auto generation of record keys are enabled.
if (StringUtils.isNullOrEmpty(recordsKeyFields)) {
return null
}
val numRecordKeyFields = recordsKeyFields.split(",").length
if (numPartFields == 1 && numRecordKeyFields == 1) {
classOf[SimpleKeyGenerator].getName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
if (tableConfig.populateMetaFields()) {
HoodieRecord.RECORD_KEY_METADATA_FIELD
} else {
val keyFields = tableConfig.getRecordKeyFields.get()
val keyFields = tableConfig.getRecordKeyFields().get()
checkState(keyFields.length == 1)
keyFields.head
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ object HoodieSparkSqlWriter {
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
var tableConfig = getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)
val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
var (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
val originKeyGeneratorClassName = HoodieWriterUtils.getOriginKeyGenerator(parameters)
val timestampKeyGeneratorConfigs = extractConfigsRelatedToTimestampBasedKeyGenerator(
originKeyGeneratorClassName, parameters)

// Validate datasource and tableconfig keygen are the same
validateKeyGeneratorConfig(originKeyGeneratorClassName, tableConfig);
parameters = validateAndSetKeyGeneratorConfigs(originKeyGeneratorClassName, hoodieConfig, parameters, tableConfig);
validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode == SaveMode.Overwrite);

val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,30 @@ import org.apache.hudi.DataSourceWriteOptions.{RECORD_MERGER_IMPLS, _}
import org.apache.hudi.common.config.HoodieMetadataConfig.ENABLE
import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieCommonConfig, HoodieConfig}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.{HoodieException, HoodieKeyGeneratorException}
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.util.SparkKeyGenUtils
import org.apache.log4j.{LogManager}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.hudi.command.SqlKeyGenerator

import java.util.Properties
import scala.collection.JavaConversions.mapAsJavaMap
import scala.collection.JavaConverters._
import scala.collection.mutable

/**
* WriterUtils to assist in write path in Datasource and tests.
*/
object HoodieWriterUtils {

private val log = LogManager.getLogger(getClass)

def javaParametersWithWriteDefaults(parameters: java.util.Map[String, String]): java.util.Map[String, String] = {
mapAsJavaMap(parametersWithWriteDefaults(parameters.asScala.toMap))
}
Expand All @@ -55,7 +63,10 @@ object HoodieWriterUtils {
val hoodieConfig: HoodieConfig = new HoodieConfig(props)
hoodieConfig.setDefaultValue(OPERATION)
hoodieConfig.setDefaultValue(TABLE_TYPE)
hoodieConfig.setDefaultValue(PRECOMBINE_FIELD)
//when auto generation of record keys is not enabled, do not set preCombine field. This might also result in setting default value from code.
if (!parameters.getOrElse(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key(), KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.defaultValue()).toBoolean) {
hoodieConfig.setDefaultValue(PRECOMBINE_FIELD)
}
hoodieConfig.setDefaultValue(PAYLOAD_CLASS_NAME)
hoodieConfig.setDefaultValue(KEYGENERATOR_CLASS_NAME)
hoodieConfig.setDefaultValue(ENABLE)
Expand Down Expand Up @@ -185,7 +196,9 @@ object HoodieWriterUtils {
/**
* Detects conflicts between datasourceKeyGen and existing table configuration keyGen
*/
def validateKeyGeneratorConfig(datasourceKeyGen: String, tableConfig: HoodieConfig): Unit = {
def validateAndSetKeyGeneratorConfigs(datasourceKeyGen: String, hoodieConfig: HoodieConfig,
inputParams: Map[String, String], tableConfig: HoodieConfig)
: Map[String, String] = {
val diffConfigs = StringBuilder.newBuilder

if (null != tableConfig) {
Expand All @@ -203,6 +216,51 @@ object HoodieWriterUtils {
diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting value):\n")
throw new HoodieException(diffConfigs.toString.trim)
}

val parameters = mutable.Map() ++ inputParams
// auto generation of record keys needs some special handling on setting configs.

if (hoodieConfig.getBoolean(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS)) {
val autoGenerateRecordKeyConfigKey = KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key()
// of user explicitly sets combine before insert, we fail. de-dup is not supported with auto generation of record keys.
if (hoodieConfig.getBoolean(HoodieWriteConfig.COMBINE_BEFORE_INSERT)) {
throw new HoodieKeyGeneratorException(s"Config $autoGenerateRecordKeyConfigKey can not be used when " +
s"${HoodieWriteConfig.COMBINE_BEFORE_INSERT.key()} is enabled")
}
// enables concat handle to ensure duplicate records are not de-duped.
if (!hoodieConfig.getBoolean(HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE)) {
hoodieConfig.setValue(HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE, "true")
parameters += (HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key() -> "true")
log.warn(s"Enabling config {${HoodieWriteConfig.MERGE_ALLOW_DUPLICATE_ON_INSERTS_ENABLE.key()}} when $autoGenerateRecordKeyConfigKey is used")
}
// MOR table type is not supported when auto generation of record keys are enabled since auto generation is meant only for immutable workloads.
if (hoodieConfig.getString(DataSourceWriteOptions.TABLE_TYPE) == MOR_TABLE_TYPE_OPT_VAL) {
throw new HoodieKeyGeneratorException(s"Config ${DataSourceWriteOptions.TABLE_TYPE.key()} should be set to " +
s"COW_TABLE_TYPE_OPT_VAL when $autoGenerateRecordKeyConfigKey is used")
}
// If OPERATION is explicitly set as UPSERT by the user, throw an exception. If user is using default value then
// operation is overridden to INSERT_OPERATION_OPT_VAL
if (parameters.getOrElse(OPERATION.key(), StringUtils.EMPTY_STRING) == UPSERT_OPERATION_OPT_VAL) {
throw new HoodieKeyGeneratorException(s"Config ${OPERATION.key()} should not be set to $UPSERT_OPERATION_OPT_VAL" +
s" when $autoGenerateRecordKeyConfigKey is used")
} else if (hoodieConfig.getString(OPERATION) == UPSERT_OPERATION_OPT_VAL) {
hoodieConfig.setValue(OPERATION.key(), INSERT_OPERATION_OPT_VAL)
parameters += (OPERATION.key() -> INSERT_OPERATION_OPT_VAL)
log.warn(s"Setting config ${OPERATION.key()} to $INSERT_OPERATION_OPT_VAL when $autoGenerateRecordKeyConfigKey is used")
}
// preCombine does not make sense when auto generation of record keys are enabled.
if (!StringUtils.isNullOrEmpty(hoodieConfig.getString(DataSourceWriteOptions.PRECOMBINE_FIELD))) {
throw new HoodieKeyGeneratorException(s"Config ${DataSourceWriteOptions.PRECOMBINE_FIELD.key()} should not be set" +
s" when $autoGenerateRecordKeyConfigKey is used")
}
// record key field config should not be set when auto generation of record keys are used.
if (!StringUtils.isNullOrEmpty(hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD))) {
throw new HoodieKeyGeneratorException(s"Config ${DataSourceWriteOptions.RECORDKEY_FIELD.key()} should not be set " +
s"when $autoGenerateRecordKeyConfigKey is used")
}
}

parameters.toMap
}

private def getStringFromTableConfigWithAlternatives(tableConfig: HoodieConfig, key: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class HoodieCDCRDD(
private lazy val recordKeyField: String = if (populateMetaFields) {
HoodieRecord.RECORD_KEY_METADATA_FIELD
} else {
val keyFields = metaClient.getTableConfig.getRecordKeyFields.get()
val keyFields = metaClient.getTableConfig.getRecordKeyFields().get()
checkState(keyFields.length == 1)
keyFields.head
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ class HoodieCatalogTable(val spark: SparkSession, var table: CatalogTable) exten
/**
* Record Field List(Primary Key List)
*/
lazy val primaryKeys: Array[String] = tableConfig.getRecordKeyFields.orElse(Array.empty)
// FIX ME?
lazy val primaryKeys: Array[String] = tableConfig.getRecordKeyFields(null).orElse(Array.empty)

/**
* PreCombine Field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils.getRootLevelFieldName
import org.apache.hudi.common.model.{HoodieAvroRecordMerger, HoodieRecordMerger}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.{StringUtils, ValidationUtils}
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -193,13 +194,17 @@ object HoodieOptionConfig {
def validateTable(spark: SparkSession, schema: StructType, sqlOptions: Map[String, String]): Unit = {
val resolver = spark.sessionState.conf.resolver

// validate primary key
val primaryKeys = sqlOptions.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)
.map(_.split(",").filter(_.length > 0))
ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.")
primaryKeys.get.foreach { primaryKey =>
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, getRootLevelFieldName(primaryKey))),
s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
// validate primary keys only when auto generation of record keys is not enabled.
if (!sqlOptions.getOrElse(KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.key(),
KeyGeneratorOptions.AUTO_GENERATE_RECORD_KEYS.defaultValue()).toBoolean) {
// validate primary key
val primaryKeys = sqlOptions.get(SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName)
.map(_.split(",").filter(_.length > 0))
ValidationUtils.checkArgument(primaryKeys.nonEmpty, "No `primaryKey` is specified.")
primaryKeys.get.foreach { primaryKey =>
ValidationUtils.checkArgument(schema.exists(f => resolver(f.name, getRootLevelFieldName(primaryKey))),
s"Can't find primaryKey `$primaryKey` in ${schema.treeString}.")
}
}

// validate preCombine key
Expand Down
Loading