diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java index bf560810ae99b..723c9f3cbff48 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.lock.LockProvider; +import org.apache.hudi.common.util.Option; import java.io.File; import java.io.FileReader; @@ -159,9 +160,11 @@ public class HoodieLockConfig extends HoodieConfig { public static final ConfigProperty ZK_LOCK_KEY = ConfigProperty .key(ZK_LOCK_KEY_PROP_KEY) .noDefaultValue() + .withInferFunction(p -> Option.ofNullable(p.getStringOrDefault(HoodieWriteConfig.TBL_NAME, null))) .sinceVersion("0.8.0") .withDocumentation("Key name under base_path at which to create a ZNode and acquire lock. " - + "Final path on zk will look like base_path/lock_key. We recommend setting this to the table name"); + + "Final path on zk will look like base_path/lock_key. If this parameter is not set, we would " + + "set it as the table name"); // Pluggable type of lock provider public static final ConfigProperty LOCK_PROVIDER_CLASS_NAME = ConfigProperty diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java index 7870d8551e090..9612914303588 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/ConfigProperty.java @@ -23,6 +23,8 @@ import org.apache.hudi.exception.HoodieException; import java.io.Serializable; +import java.util.Arrays; +import java.util.List; import java.util.function.Function; import java.util.Objects; @@ -93,8 +95,8 @@ Option>> getInferFunc() { return inferFunction; } - public String[] getAlternatives() { - return alternatives; + public List getAlternatives() { + return Arrays.asList(alternatives); } public ConfigProperty withDocumentation(String doc) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java index 88335db130ae7..a4161cf1e375b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/DFSPropertiesConfiguration.java @@ -96,7 +96,7 @@ public static TypedProperties loadGlobalProps() { try { conf.addPropsFromFile(new Path(DEFAULT_CONF_FILE_DIR)); } catch (Exception ignored) { - LOG.debug("Didn't find config file under default conf file dir: " + DEFAULT_CONF_FILE_DIR); + LOG.warn("Didn't find config file under default conf file dir: " + DEFAULT_CONF_FILE_DIR); } } return conf.getProps(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java index 6ae1ba6e34dde..31e43d773dff7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieConfig.java @@ -90,7 +90,7 @@ public boolean contains(ConfigProperty configProperty) { if (props.containsKey(configProperty.key())) { return true; } - return Arrays.stream(configProperty.getAlternatives()).anyMatch(props::containsKey); + return configProperty.getAlternatives().stream().anyMatch(props::containsKey); } private Option getRawValue(ConfigProperty configProperty) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 94bcc0d0de85e..c1101f3b2011c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -18,18 +18,23 @@ package org.apache.hudi import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} -import org.apache.hudi.common.config.ConfigProperty +import org.apache.hudi.common.config.{ConfigProperty, HoodieConfig} import org.apache.hudi.common.fs.ConsistencyGuardConfig import org.apache.hudi.common.model.{HoodieTableType, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig +import org.apache.hudi.common.util.Option import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.util.ConfigUtils -import org.apache.hudi.hive.{HiveSyncTool, SlashEncodedDayPartitionValueExtractor} +import org.apache.hudi.hive.{HiveStylePartitionValueExtractor, HiveSyncTool, MultiPartKeysValueExtractor, NonPartitionedExtractor, SlashEncodedDayPartitionValueExtractor} import org.apache.hudi.keygen.constant.KeyGeneratorOptions -import org.apache.hudi.keygen.{CustomKeyGenerator, SimpleKeyGenerator} +import org.apache.hudi.keygen.{ComplexKeyGenerator, CustomKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.log4j.LogManager import org.apache.spark.sql.execution.datasources.{DataSourceUtils => SparkDataSourceUtils} +import java.util.function.{Function => JavaFunction} +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + /** * List of options that can be passed to the Hoodie datasource, * in addition to the hoodie client configs @@ -211,7 +216,7 @@ object DataSourceWriteOptions { .map(SparkDataSourceUtils.decodePartitioningColumns) .getOrElse(Nil) val keyGeneratorClass = optParams.getOrElse(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(), - DataSourceWriteOptions.DEFAULT_KEYGENERATOR_CLASS_OPT_VAL) + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.defaultValue) val partitionPathField = keyGeneratorClass match { @@ -273,8 +278,26 @@ object DataSourceWriteOptions { */ val HIVE_STYLE_PARTITIONING = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE - val KEYGENERATOR_CLASS_NAME = ConfigProperty.key("hoodie.datasource.write.keygenerator.class") + /** + * Key generator class, that implements will extract the key out of incoming record + * + */ + val keyGeneraterInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => { + if (!p.contains(PARTITIONPATH_FIELD)) { + Option.of(classOf[NonpartitionedKeyGenerator].getName) + } else { + val numOfPartFields = p.getString(PARTITIONPATH_FIELD).split(",").length + if (numOfPartFields == 1) { + Option.of(classOf[SimpleKeyGenerator].getName) + } else { + Option.of(classOf[ComplexKeyGenerator].getName) + } + } + }) + val KEYGENERATOR_CLASS_NAME: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.write.keygenerator.class") .defaultValue(classOf[SimpleKeyGenerator].getName) + .withInferFunction(keyGeneraterInferFunc) .withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator`") val ENABLE_ROW_WRITER: ConfigProperty[String] = ConfigProperty @@ -364,9 +387,19 @@ object DataSourceWriteOptions { .defaultValue("default") .withDocumentation("database to sync to") + val hiveTableOptKeyInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => { + if (p.contains(TABLE_NAME)) { + Option.of(p.getString(TABLE_NAME)) + } else if (p.contains(HoodieWriteConfig.TBL_NAME)) { + Option.of(p.getString(HoodieWriteConfig.TBL_NAME)) + } else { + Option.empty[String]() + } + }) val HIVE_TABLE: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.table") .defaultValue("unknown") + .withInferFunction(hiveTableOptKeyInferFunc) .withDocumentation("table to sync to") val HIVE_BASE_FILE_FORMAT: ConfigProperty[String] = ConfigProperty @@ -389,16 +422,37 @@ object DataSourceWriteOptions { .defaultValue("jdbc:hive2://localhost:10000") .withDocumentation("Hive metastore url") + val hivePartitionFieldsInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => { + if (p.contains(PARTITIONPATH_FIELD)) { + Option.of(p.getString(PARTITIONPATH_FIELD)) + } else { + Option.empty[String]() + } + }) val HIVE_PARTITION_FIELDS: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.partition_fields") .defaultValue("") .withDocumentation("Field in the table to use for determining hive partition columns.") - + .withInferFunction(hivePartitionFieldsInferFunc) + + val hivePartitionExtractorInferFunc = DataSourceOptionsHelper.scalaFunctionToJavaFunction((p: HoodieConfig) => { + if (!p.contains(PARTITIONPATH_FIELD)) { + Option.of(classOf[NonPartitionedExtractor].getName) + } else { + val numOfPartFields = p.getString(PARTITIONPATH_FIELD).split(",").length + if (numOfPartFields == 1 && p.contains(HIVE_STYLE_PARTITIONING) && p.getString(HIVE_STYLE_PARTITIONING) == "true") { + Option.of(classOf[HiveStylePartitionValueExtractor].getName) + } else { + Option.of(classOf[MultiPartKeysValueExtractor].getName) + } + } + }) val HIVE_PARTITION_EXTRACTOR_CLASS: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.partition_extractor_class") .defaultValue(classOf[SlashEncodedDayPartitionValueExtractor].getCanonicalName) .withDocumentation("Class which implements PartitionValueExtractor to extract the partition values, " + "default 'SlashEncodedDayPartitionValueExtractor'.") + .withInferFunction(hivePartitionExtractorInferFunc) val HIVE_ASSUME_DATE_PARTITION: ConfigProperty[String] = ConfigProperty .key("hoodie.datasource.hive_sync.assume_date_partitioning") @@ -755,7 +809,7 @@ object DataSourceOptionsHelper { // maps the deprecated config name to its latest name val allAlternatives: Map[String, String] = { val alterMap = scala.collection.mutable.Map[String, String]() - allConfigsWithAlternatives.foreach(cfg => cfg.getAlternatives.foreach(alternative => alterMap(alternative) = cfg.key)) + allConfigsWithAlternatives.foreach(cfg => cfg.getAlternatives.asScala.foreach(alternative => alterMap(alternative) = cfg.key)) alterMap.toMap } @@ -794,4 +848,10 @@ object DataSourceOptionsHelper { QUERY_TYPE.key -> queryType ) ++ translateConfigurations(parameters) } + + implicit def scalaFunctionToJavaFunction[From, To](function: (From) => To): JavaFunction[From, To] = { + new JavaFunction[From, To] { + override def apply (input: From): To = function (input) + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index d1094bb5cd3a7..63b9c4d616316 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -719,8 +719,8 @@ object HoodieSparkSqlWriter { private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String], tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = { - val mergedParams = mutable.Map.empty ++ - DataSourceWriteOptions.translateSqlOptions(HoodieWriterUtils.parametersWithWriteDefaults(optParams)) + val translatedOptions = DataSourceWriteOptions.translateSqlOptions(optParams) + val mergedParams = mutable.Map.empty ++ HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions) if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) && mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) { mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = mergedParams(KEYGENERATOR_CLASS_NAME.key) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index c89c19dde69f5..52d4a528897f8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -48,41 +48,44 @@ object HoodieWriterUtils { */ def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = { val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala - Map(OPERATION.key -> OPERATION.defaultValue, - TABLE_TYPE.key -> TABLE_TYPE.defaultValue, - PRECOMBINE_FIELD.key -> PRECOMBINE_FIELD.defaultValue, - PAYLOAD_CLASS_NAME.key -> PAYLOAD_CLASS_NAME.defaultValue, - RECORDKEY_FIELD.key -> RECORDKEY_FIELD.defaultValue, - PARTITIONPATH_FIELD.key -> PARTITIONPATH_FIELD.defaultValue, - KEYGENERATOR_CLASS_NAME.key -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL, - ENABLE.key -> ENABLE.defaultValue.toString, - COMMIT_METADATA_KEYPREFIX.key -> COMMIT_METADATA_KEYPREFIX.defaultValue, - INSERT_DROP_DUPS.key -> INSERT_DROP_DUPS.defaultValue, - STREAMING_RETRY_CNT.key -> STREAMING_RETRY_CNT.defaultValue, - STREAMING_RETRY_INTERVAL_MS.key -> STREAMING_RETRY_INTERVAL_MS.defaultValue, - STREAMING_IGNORE_FAILED_BATCH.key -> STREAMING_IGNORE_FAILED_BATCH.defaultValue, - META_SYNC_CLIENT_TOOL_CLASS_NAME.key -> META_SYNC_CLIENT_TOOL_CLASS_NAME.defaultValue, - HIVE_SYNC_ENABLED.key -> HIVE_SYNC_ENABLED.defaultValue, - META_SYNC_ENABLED.key -> META_SYNC_ENABLED.defaultValue, - HIVE_DATABASE.key -> HIVE_DATABASE.defaultValue, - HIVE_TABLE.key -> HIVE_TABLE.defaultValue, - HIVE_BASE_FILE_FORMAT.key -> HIVE_BASE_FILE_FORMAT.defaultValue, - HIVE_USER.key -> HIVE_USER.defaultValue, - HIVE_PASS.key -> HIVE_PASS.defaultValue, - HIVE_URL.key -> HIVE_URL.defaultValue, - HIVE_PARTITION_FIELDS.key -> HIVE_PARTITION_FIELDS.defaultValue, - HIVE_PARTITION_EXTRACTOR_CLASS.key -> HIVE_PARTITION_EXTRACTOR_CLASS.defaultValue, - HIVE_STYLE_PARTITIONING.key -> HIVE_STYLE_PARTITIONING.defaultValue, - HIVE_USE_JDBC.key -> HIVE_USE_JDBC.defaultValue, - HIVE_CREATE_MANAGED_TABLE.key() -> HIVE_CREATE_MANAGED_TABLE.defaultValue.toString, - HIVE_SYNC_AS_DATA_SOURCE_TABLE.key() -> HIVE_SYNC_AS_DATA_SOURCE_TABLE.defaultValue(), - ASYNC_COMPACT_ENABLE.key -> ASYNC_COMPACT_ENABLE.defaultValue, - INLINE_CLUSTERING_ENABLE.key -> INLINE_CLUSTERING_ENABLE.defaultValue, - ASYNC_CLUSTERING_ENABLE.key -> ASYNC_CLUSTERING_ENABLE.defaultValue, - ENABLE_ROW_WRITER.key -> ENABLE_ROW_WRITER.defaultValue, - RECONCILE_SCHEMA.key -> RECONCILE_SCHEMA.defaultValue.toString, - DROP_PARTITION_COLUMNS.key -> DROP_PARTITION_COLUMNS.defaultValue - ) ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters) + val props = new Properties() + props.putAll(parameters) + val hoodieConfig: HoodieConfig = new HoodieConfig(props) + hoodieConfig.setDefaultValue(OPERATION) + hoodieConfig.setDefaultValue(TABLE_TYPE) + hoodieConfig.setDefaultValue(PRECOMBINE_FIELD) + hoodieConfig.setDefaultValue(PAYLOAD_CLASS_NAME) + hoodieConfig.setDefaultValue(RECORDKEY_FIELD) + hoodieConfig.setDefaultValue(PARTITIONPATH_FIELD) + hoodieConfig.setDefaultValue(KEYGENERATOR_CLASS_NAME) + hoodieConfig.setDefaultValue(ENABLE) + hoodieConfig.setDefaultValue(COMMIT_METADATA_KEYPREFIX) + hoodieConfig.setDefaultValue(INSERT_DROP_DUPS) + hoodieConfig.setDefaultValue(STREAMING_RETRY_CNT) + hoodieConfig.setDefaultValue(STREAMING_RETRY_INTERVAL_MS) + hoodieConfig.setDefaultValue(STREAMING_IGNORE_FAILED_BATCH) + hoodieConfig.setDefaultValue(META_SYNC_CLIENT_TOOL_CLASS_NAME) + hoodieConfig.setDefaultValue(HIVE_SYNC_ENABLED) + hoodieConfig.setDefaultValue(META_SYNC_ENABLED) + hoodieConfig.setDefaultValue(HIVE_DATABASE) + hoodieConfig.setDefaultValue(HIVE_TABLE) + hoodieConfig.setDefaultValue(HIVE_BASE_FILE_FORMAT) + hoodieConfig.setDefaultValue(HIVE_USER) + hoodieConfig.setDefaultValue(HIVE_PASS) + hoodieConfig.setDefaultValue(HIVE_URL) + hoodieConfig.setDefaultValue(HIVE_PARTITION_FIELDS) + hoodieConfig.setDefaultValue(HIVE_PARTITION_EXTRACTOR_CLASS) + hoodieConfig.setDefaultValue(HIVE_STYLE_PARTITIONING) + hoodieConfig.setDefaultValue(HIVE_USE_JDBC) + hoodieConfig.setDefaultValue(HIVE_CREATE_MANAGED_TABLE) + hoodieConfig.setDefaultValue(HIVE_SYNC_AS_DATA_SOURCE_TABLE) + hoodieConfig.setDefaultValue(ASYNC_COMPACT_ENABLE) + hoodieConfig.setDefaultValue(INLINE_CLUSTERING_ENABLE) + hoodieConfig.setDefaultValue(ASYNC_CLUSTERING_ENABLE) + hoodieConfig.setDefaultValue(ENABLE_ROW_WRITER) + hoodieConfig.setDefaultValue(RECONCILE_SCHEMA) + hoodieConfig.setDefaultValue(DROP_PARTITION_COLUMNS) + Map() ++ hoodieConfig.getProps.asScala ++ globalProps ++ DataSourceOptionsHelper.translateConfigurations(parameters) } def toProperties(params: Map[String, String]): TypedProperties = { diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java index 9372a36f4d78e..b6e595c40a8df 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaApp.java @@ -26,6 +26,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.hive.NonPartitionedExtractor; +import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; @@ -270,7 +271,9 @@ private DataFrameWriter updateHiveSyncConfig(DataFrameWriter writer) { DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(), MultiPartKeysValueExtractor.class.getCanonicalName()); } else { - writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "dateStr"); + writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "dateStr").option( + DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(), + SlashEncodedDayPartitionValueExtractor.class.getCanonicalName()); } } return writer; diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java index cf86ba7ba3959..8302ece4b9ae9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaGenerateApp.java @@ -25,6 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hudi.hive.NonPartitionedExtractor; +import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; @@ -140,7 +141,9 @@ private DataFrameWriter updateHiveSyncConfig(DataFrameWriter writer) { DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(), MultiPartKeysValueExtractor.class.getCanonicalName()); } else { - writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "dateStr"); + writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "dateStr").option( + DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(), + SlashEncodedDayPartitionValueExtractor.class.getCanonicalName()); } } return writer; diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java index c047ef19677cf..f1e6b45b292b7 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/HoodieJavaStreamingApp.java @@ -28,6 +28,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.hive.MultiPartKeysValueExtractor; +import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -391,7 +392,9 @@ private DataStreamWriter updateHiveSyncConfig(DataStreamWriter writer) DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(), MultiPartKeysValueExtractor.class.getCanonicalName()); } else { - writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "dateStr"); + writer = writer.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS().key(), "dateStr").option( + DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS().key(), + SlashEncodedDayPartitionValueExtractor.class.getCanonicalName()); } } return writer; diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala new file mode 100644 index 0000000000000..d5c3bfa01fc2e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceOptions.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.hive.{HiveStylePartitionValueExtractor, MultiPartKeysValueExtractor} +import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator} +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test + +class TestDataSourceOptions { + @Test def inferDataSourceOptions(): Unit = { + val inputOptions1 = Map( + TABLE_NAME.key -> "hudi_table", + PARTITIONPATH_FIELD.key -> "year,month" + ) + val modifiedOptions1 = HoodieWriterUtils.parametersWithWriteDefaults(inputOptions1) + assertEquals(classOf[ComplexKeyGenerator].getName, modifiedOptions1(KEYGENERATOR_CLASS_NAME.key)) + assertEquals("hudi_table", modifiedOptions1(HIVE_TABLE.key)) + assertEquals("year,month", modifiedOptions1(HIVE_PARTITION_FIELDS.key)) + assertEquals(classOf[MultiPartKeysValueExtractor].getName, + modifiedOptions1(HIVE_PARTITION_EXTRACTOR_CLASS.key)) + + val inputOptions2 = Map( + TABLE_NAME.key -> "hudi_table", + PARTITIONPATH_FIELD.key -> "year", + HIVE_STYLE_PARTITIONING.key -> "true" + ) + val modifiedOptions2 = HoodieWriterUtils.parametersWithWriteDefaults(inputOptions2) + assertEquals(classOf[SimpleKeyGenerator].getName, modifiedOptions2(KEYGENERATOR_CLASS_NAME.key)) + assertEquals("hudi_table", modifiedOptions2(HIVE_TABLE.key)) + assertEquals("year", modifiedOptions2(HIVE_PARTITION_FIELDS.key)) + assertEquals(classOf[HiveStylePartitionValueExtractor].getName, + modifiedOptions2(HIVE_PARTITION_EXTRACTOR_CLASS.key)) + } +}