diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index 27846e15df096..8a94ac675a2a8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.{DataType, NullType, StringType, StructField, StructType} import scala.collection.immutable.Map @@ -171,10 +171,6 @@ object HoodieSqlUtils extends SparkAdapterSupport { /** * Append the SparkSession config and table options to the baseConfig. * We add the "spark" prefix to hoodie's config key. - * @param spark - * @param options - * @param baseConfig - * @return */ def withSparkConf(spark: SparkSession, options: Map[String, String]) (baseConfig: Map[String, String]): Map[String, String] = { @@ -184,4 +180,7 @@ object HoodieSqlUtils extends SparkAdapterSupport { } def isSpark3: Boolean = SPARK_VERSION.startsWith("3.") + + def isEnableHive(sparkSession: SparkSession): Boolean = + "hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala index bc25b1f3824a7..8aabe00086193 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala @@ -155,7 +155,7 @@ case class CreateHoodieTableCommand(table: CatalogTable, ignoreIfExists: Boolean validateTable(newTable) // Create table in the catalog - val enableHive = "hive" == sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) + val enableHive = isEnableHive(sparkSession) if (enableHive) { createHiveDataSourceTable(newTable, sparkSession) } else { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala index 6bfb5275f4b2e..d2e72ccc5fc68 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala @@ -66,10 +66,9 @@ case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends Runnab assert(primaryColumns.nonEmpty, s"There are no primary key defined in table $tableId, cannot execute delete operator") - withSparkConf(sparkSession, targetTable.storage.properties) { Map( - "path" -> path.toString, + "path" -> path, KEYGENERATOR_CLASS_OPT_KEY.key -> classOf[SqlKeyGenerator].getCanonicalName, TABLE_NAME.key -> tableId.table, OPERATION_OPT_KEY.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 2266d0415fc6c..6ce3070277c3f 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.hudi.command import java.util.Properties - import org.apache.avro.Schema import org.apache.avro.generic.{GenericRecord, IndexedRecord} import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord} @@ -232,7 +231,7 @@ object InsertIntoHoodieTableCommand { } else { classOf[DefaultHoodieRecordPayload].getCanonicalName } - + val enableHive = isEnableHive(sparkSession) withSparkConf(sparkSession, options) { Map( "path" -> path, @@ -244,7 +243,7 @@ object InsertIntoHoodieTableCommand { RECORDKEY_FIELD_OPT_KEY.key -> primaryColumns.mkString(","), PARTITIONPATH_FIELD_OPT_KEY.key -> partitionFields, PAYLOAD_CLASS_OPT_KEY.key -> payloadClassName, - META_SYNC_ENABLED_OPT_KEY.key -> "true", + META_SYNC_ENABLED_OPT_KEY.key -> enableHive.toString, HIVE_USE_JDBC_OPT_KEY.key -> "false", HIVE_DATABASE_OPT_KEY.key -> table.identifier.database.getOrElse("default"), HIVE_TABLE_OPT_KEY.key -> table.identifier.table, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 0c6fe77f2402b..008d0260d7a67 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.hudi.command import java.util.Base64 - import org.apache.avro.Schema import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig @@ -426,7 +425,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab throw new IllegalArgumentException(s"Merge Key[${targetKey2SourceExpression.keySet.mkString(",")}] is not" + s" Equal to the defined primary key[${definedPk.mkString(",")}] in table $targetTableName") } - + // Enable the hive sync by default if spark have enable the hive metastore. + val enableHive = isEnableHive(sparkSession) HoodieWriterUtils.parametersWithWriteDefaults( withSparkConf(sparkSession, options) { Map( @@ -437,7 +437,7 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Runnab TABLE_NAME.key -> targetTableName, PARTITIONPATH_FIELD_OPT_KEY.key -> targetTable.partitionColumnNames.mkString(","), PAYLOAD_CLASS_OPT_KEY.key -> classOf[ExpressionPayload].getCanonicalName, - META_SYNC_ENABLED_OPT_KEY.key -> "true", + META_SYNC_ENABLED_OPT_KEY.key -> enableHive.toString, HIVE_USE_JDBC_OPT_KEY.key -> "false", HIVE_DATABASE_OPT_KEY.key -> targetTableDb, HIVE_TABLE_OPT_KEY.key -> targetTableName, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala index af3bcdefc2b8d..73addacee232a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala @@ -93,16 +93,17 @@ case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends RunnableCo assert(primaryColumns.nonEmpty, s"There are no primary key in table $tableId, cannot execute update operator") + val enableHive = isEnableHive(sparkSession) withSparkConf(sparkSession, targetTable.storage.properties) { Map( - "path" -> path.toString, + "path" -> path, RECORDKEY_FIELD_OPT_KEY.key -> primaryColumns.mkString(","), KEYGENERATOR_CLASS_OPT_KEY.key -> classOf[SqlKeyGenerator].getCanonicalName, PRECOMBINE_FIELD_OPT_KEY.key -> primaryColumns.head, //set the default preCombine field. TABLE_NAME.key -> tableId.table, OPERATION_OPT_KEY.key -> DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL, PARTITIONPATH_FIELD_OPT_KEY.key -> targetTable.partitionColumnNames.mkString(","), - META_SYNC_ENABLED_OPT_KEY.key -> "false", // TODO make the meta sync enable by default. + META_SYNC_ENABLED_OPT_KEY.key -> enableHive.toString, HIVE_USE_JDBC_OPT_KEY.key -> "false", HIVE_DATABASE_OPT_KEY.key -> tableId.database.getOrElse("default"), HIVE_TABLE_OPT_KEY.key -> tableId.table, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala index 067e49af46f19..c8291db83af84 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieSqlBase.scala @@ -37,7 +37,6 @@ class TestHoodieSqlBase extends FunSuite with BeforeAndAfterAll { .appName("hoodie sql test") .withExtensions(new HoodieSparkSessionExtension) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - .config("hoodie.datasource.meta.sync.enable", "false") .config("hoodie.insert.shuffle.parallelism", "4") .config("hoodie.upsert.shuffle.parallelism", "4") .config("hoodie.delete.shuffle.parallelism", "4")