diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index f321cdf158965..c3f6fffdb53f4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -36,6 +36,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException import org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows} +import org.apache.hudi.hive.util.ConfigUtils import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} import org.apache.hudi.index.SparkHoodieIndexFactory import org.apache.hudi.internal.DataSourceInternalWriterHelper @@ -45,6 +46,7 @@ import org.apache.hudi.table.BulkInsertPartitioner import org.apache.log4j.LogManager import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD +import org.apache.spark.sql.hudi.HoodieOptionConfig import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext, SaveMode, SparkSession} @@ -577,6 +579,7 @@ object HoodieSparkSqlWriter { var metaSyncEnabled = hoodieConfig.getStringOrDefault(META_SYNC_ENABLED).toBoolean var syncClientToolClassSet = scala.collection.mutable.Set[String]() hoodieConfig.getString(META_SYNC_CLIENT_TOOL_CLASS_NAME).split(",").foreach(syncClass => syncClientToolClassSet += syncClass) + hoodieConfig.setValue(HIVE_TABLE_SERDE_PROPERTIES, getHiveTableSerdeProperties(hoodieConfig)) // for backward compatibility if (hiveSyncEnabled) { @@ -608,6 +611,24 @@ object HoodieSparkSqlWriter { metaSyncSuccess } + private def getHiveTableSerdeProperties(hoodieConfig: HoodieConfig): String = { + val serdeOption = new java.util.HashMap[String, String] + + if (hoodieConfig.getString(RECORDKEY_FIELD) != null) { + serdeOption.put(HoodieOptionConfig.SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName, hoodieConfig.getString(RECORDKEY_FIELD)) + } + + if (hoodieConfig.getString(PRECOMBINE_FIELD) != null) { + serdeOption.put(HoodieOptionConfig.SQL_KEY_PRECOMBINE_FIELD.sqlKeyName, hoodieConfig.getString(PRECOMBINE_FIELD)) + } + + if (hoodieConfig.getString(TABLE_TYPE) != null) { + serdeOption.put(HoodieOptionConfig.SQL_KEY_TABLE_TYPE.sqlKeyName, HoodieOptionConfig.reverseValueMapping(hoodieConfig.getString(TABLE_TYPE))) + } + + ConfigUtils.configToString(serdeOption) + } + /** * Group all table/action specific information into a case class. */ diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala index e3388e221a972..1e810e10e5ba1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala @@ -104,7 +104,7 @@ object HoodieOptionConfig { SQL_VALUE_TABLE_TYPE_MOR -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL ) - private lazy val reverseValueMapping = valueMapping.map(f => f._2 -> f._1) + lazy val reverseValueMapping = valueMapping.map(f => f._2 -> f._1) def withDefaultSqlOptions(options: Map[String, String]): Map[String, String] = defaultSqlOptions ++ options