diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 4c76f5f380b88..4643da5064c19 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -353,6 +353,9 @@ object DataSourceWriteOptions { val HIVE_IGNORE_EXCEPTIONS_OPT_KEY = "hoodie.datasource.hive_sync.ignore_exceptions" val HIVE_SKIP_RO_SUFFIX = "hoodie.datasource.hive_sync.skip_ro_suffix" val HIVE_SUPPORT_TIMESTAMP = "hoodie.datasource.hive_sync.support_timestamp" + val HIVE_TABLE_PROPERTIES = "hoodie.datasource.hive_sync.table_properties" + val HIVE_TABLE_SERDE_PROPERTIES = "hoodie.datasource.hive_sync.serde_properties" + val HIVE_SYNC_AS_DATA_SOURCE_TABLE = "hoodie.datasource.hive_sync.sync_as_datasource" // DEFAULT FOR HIVE SPECIFIC CONFIGS val DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL = "false" @@ -372,6 +375,7 @@ object DataSourceWriteOptions { val DEFAULT_HIVE_IGNORE_EXCEPTIONS_OPT_KEY = "false" val DEFAULT_HIVE_SKIP_RO_SUFFIX_VAL = "false" val DEFAULT_HIVE_SUPPORT_TIMESTAMP = "false" + val DEFAULT_HIVE_SYNC_AS_DATA_SOURCE_TABLE = "true" // Async Compaction - Enabled by default for MOR val ASYNC_COMPACT_ENABLE_OPT_KEY = "hoodie.datasource.compaction.async.enable" diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 340ac14436d14..3a5b51e2fafaa 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -36,6 +36,7 @@ import org.apache.hudi.common.util.{CommitUtils, ReflectionUtils} import org.apache.hudi.config.HoodieBootstrapConfig.{BOOTSTRAP_BASE_PATH_PROP, BOOTSTRAP_INDEX_CLASS_PROP, DEFAULT_BOOTSTRAP_INDEX_CLASS} import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hive.util.ConfigUtils import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} import org.apache.hudi.internal.DataSourceInternalWriterHelper import org.apache.hudi.sync.common.AbstractSyncTool @@ -44,7 +45,10 @@ import org.apache.spark.SPARK_VERSION import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.SCHEMA_STRING_LENGTH_THRESHOLD +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession} import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer @@ -220,7 +224,8 @@ private[hudi] object HoodieSparkSqlWriter { // Check for errors and commit the write. val (writeSuccessful, compactionInstant) = - commitAndPerformPostOperations(writeResult, parameters, writeClient, tableConfig, jsc, + commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, + writeResult, parameters, writeClient, tableConfig, jsc, TableInstantInfo(basePath, instantTime, commitActionType, operation)) def unpersistRdd(rdd: RDD[_]): Unit = { @@ -305,7 +310,7 @@ private[hudi] object HoodieSparkSqlWriter { } finally { writeClient.close() } - val metaSyncSuccess = metaSync(parameters, basePath, jsc.hadoopConfiguration) + val metaSyncSuccess = metaSync(sqlContext.sparkSession, parameters, basePath, df.schema) metaSyncSuccess } @@ -346,12 +351,13 @@ private[hudi] object HoodieSparkSqlWriter { } val hiveSyncEnabled = params.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) val metaSyncEnabled = params.get(META_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) - val syncHiveSucess = if (hiveSyncEnabled || metaSyncEnabled) { - metaSync(params, basePath, sqlContext.sparkContext.hadoopConfiguration) + val syncHiveSuccess = + if (hiveSyncEnabled || metaSyncEnabled) { + metaSync(sqlContext.sparkSession, parameters, basePath, df.schema) } else { true } - (syncHiveSucess, common.util.Option.ofNullable(instantTime)) + (syncHiveSuccess, common.util.Option.ofNullable(instantTime)) } def toProperties(params: Map[String, String]): TypedProperties = { @@ -398,7 +404,7 @@ private[hudi] object HoodieSparkSqlWriter { private def buildSyncConfig(basePath: Path, parameters: Map[String, String]): HiveSyncConfig = { val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig() hiveSyncConfig.basePath = basePath.toString - hiveSyncConfig.baseFileFormat = parameters(HIVE_BASE_FILE_FORMAT_OPT_KEY); + hiveSyncConfig.baseFileFormat = parameters(HIVE_BASE_FILE_FORMAT_OPT_KEY) hiveSyncConfig.usePreApacheInputFormat = parameters.get(HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY).exists(r => r.toBoolean) hiveSyncConfig.databaseName = parameters(HIVE_DATABASE_OPT_KEY) @@ -417,17 +423,77 @@ private[hudi] object HoodieSparkSqlWriter { hiveSyncConfig.autoCreateDatabase = parameters.get(HIVE_AUTO_CREATE_DATABASE_OPT_KEY).exists(r => r.toBoolean) hiveSyncConfig.decodePartition = parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY, DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL).toBoolean + + val syncAsDtaSourceTable = parameters.getOrElse(DataSourceWriteOptions.HIVE_SYNC_AS_DATA_SOURCE_TABLE, + DataSourceWriteOptions.DEFAULT_HIVE_SYNC_AS_DATA_SOURCE_TABLE).toBoolean + if (syncAsDtaSourceTable) { + hiveSyncConfig.tableProperties = parameters.getOrElse(HIVE_TABLE_PROPERTIES, null) + hiveSyncConfig.serdeProperties = createSqlTableSerdeProperties(parameters, basePath.toString) + } hiveSyncConfig } - private def metaSync(parameters: Map[String, String], - basePath: Path, - hadoopConf: Configuration): Boolean = { + /** + * Add Spark Sql related table properties to the HIVE_TABLE_PROPERTIES. + * @param sqlConf The spark sql conf. + * @param schema The schema to write to the table. + * @param parameters The origin parameters. + * @return A new parameters added the HIVE_TABLE_PROPERTIES property. + */ + private def addSqlTableProperties(sqlConf: SQLConf, schema: StructType, + parameters: Map[String, String]): Map[String, String] = { + // Convert the schema and partition info used by spark sql to hive table properties. + // The following code refers to the spark code in + // https://github.com/apache/spark/blob/master/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala + + val partitionSet = parameters(HIVE_PARTITION_FIELDS_OPT_KEY) + .split(",").map(_.trim).filter(!_.isEmpty).toSet + val threshold = sqlConf.getConf(SCHEMA_STRING_LENGTH_THRESHOLD) + + val (partitionCols, dataCols) = schema.partition(c => partitionSet.contains(c.name)) + val reOrderedType = StructType(dataCols ++ partitionCols) + val schemaParts = reOrderedType.json.grouped(threshold).toSeq + + var properties = Map( + "spark.sql.sources.provider" -> "hudi", + "spark.sql.sources.schema.numParts" -> schemaParts.size.toString + ) + schemaParts.zipWithIndex.foreach { case (part, index) => + properties += s"spark.sql.sources.schema.part.$index" -> part + } + // add partition columns + if (partitionSet.nonEmpty) { + properties += "spark.sql.sources.schema.numPartCols" -> partitionSet.size.toString + partitionSet.zipWithIndex.foreach { case (partCol, index) => + properties += s"spark.sql.sources.schema.partCol.$index" -> partCol + } + } + var sqlPropertyText = ConfigUtils.configToString(properties) + sqlPropertyText = if (parameters.containsKey(HIVE_TABLE_PROPERTIES)) { + sqlPropertyText + "\n" + parameters(HIVE_TABLE_PROPERTIES) + } else { + sqlPropertyText + } + parameters + (HIVE_TABLE_PROPERTIES -> sqlPropertyText) + } + + private def createSqlTableSerdeProperties(parameters: Map[String, String], basePath: String): String = { + val pathProp = s"path=$basePath" + if (parameters.containsKey(HIVE_TABLE_SERDE_PROPERTIES)) { + pathProp + "\n" + parameters(HIVE_TABLE_SERDE_PROPERTIES) + } else { + pathProp + } + } + + private def metaSync(spark: SparkSession, parameters: Map[String, String], basePath: Path, + schema: StructType): Boolean = { val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) var metaSyncEnabled = parameters.get(META_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean) var syncClientToolClassSet = scala.collection.mutable.Set[String]() parameters(META_SYNC_CLIENT_TOOL_CLASS).split(",").foreach(syncClass => syncClientToolClassSet += syncClass) + val newParameters = addSqlTableProperties(spark.sessionState.conf, schema, parameters) // for backward compatibility if (hiveSyncEnabled) { metaSyncEnabled = true @@ -435,17 +501,17 @@ private[hudi] object HoodieSparkSqlWriter { } var metaSyncSuccess = true if (metaSyncEnabled) { - val fs = basePath.getFileSystem(hadoopConf) + val fs = basePath.getFileSystem(spark.sessionState.newHadoopConf()) syncClientToolClassSet.foreach(impl => { val syncSuccess = impl.trim match { case "org.apache.hudi.hive.HiveSyncTool" => { log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")") - syncHive(basePath, fs, parameters) + syncHive(basePath, fs, newParameters) true } case _ => { val properties = new Properties(); - properties.putAll(parameters) + properties.putAll(newParameters) properties.put("basePath", basePath.toString) val syncHoodie = ReflectionUtils.loadClass(impl.trim, Array[Class[_]](classOf[Properties], classOf[FileSystem]), properties, fs).asInstanceOf[AbstractSyncTool] syncHoodie.syncHoodieTable() @@ -463,7 +529,9 @@ private[hudi] object HoodieSparkSqlWriter { */ case class TableInstantInfo(basePath: Path, instantTime: String, commitActionType: String, operation: WriteOperationType) - private def commitAndPerformPostOperations(writeResult: HoodieWriteResult, + private def commitAndPerformPostOperations(spark: SparkSession, + schema: StructType, + writeResult: HoodieWriteResult, parameters: Map[String, String], client: SparkRDDWriteClient[HoodieRecordPayload[Nothing]], tableConfig: HoodieTableConfig, @@ -497,7 +565,8 @@ private[hudi] object HoodieSparkSqlWriter { } log.info(s"Compaction Scheduled is $compactionInstant") - val metaSyncSuccess = metaSync(parameters, tableInstantInfo.basePath, jsc.hadoopConfiguration()) + + val metaSyncSuccess = metaSync(spark, parameters, tableInstantInfo.basePath, schema) log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled") if (!asyncCompactionEnabled) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala index e9f375ee25efe..606435aca3f88 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala @@ -22,6 +22,7 @@ import java.util import java.util.{Collections, Date, UUID} import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.client.{SparkRDDWriteClient, TestBootstrap} import org.apache.hudi.common.model.{HoodieRecord, HoodieRecordPayload} @@ -29,10 +30,13 @@ import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} +import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.testutils.DataSourceTestUtils import org.apache.hudi.{AvroConversionUtils, DataSourceUtils, DataSourceWriteOptions, HoodieSparkSqlWriter, HoodieWriterUtils} import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext, SaveMode, SparkSession} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{spy, times, verify} @@ -486,6 +490,46 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers { } }) + test("Test build sync config for spark sql") { + initSparkContext("test build sync config") + val addSqlTablePropertiesMethod = + HoodieSparkSqlWriter.getClass.getDeclaredMethod("addSqlTableProperties", + classOf[SQLConf], classOf[StructType], classOf[Map[_,_]]) + addSqlTablePropertiesMethod.setAccessible(true) + + val schema = DataSourceTestUtils.getStructTypeExampleSchema + val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val basePath = "/tmp/hoodie_test" + val params = Map( + "path" -> basePath, + DataSourceWriteOptions.TABLE_NAME_OPT_KEY -> "test_hoodie", + DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "partition" + ) + val parameters = HoodieWriterUtils.parametersWithWriteDefaults(params) + val newParams = addSqlTablePropertiesMethod.invoke(HoodieSparkSqlWriter, + spark.sessionState.conf, structType, parameters) + .asInstanceOf[Map[String, String]] + + val buildSyncConfigMethod = + HoodieSparkSqlWriter.getClass.getDeclaredMethod("buildSyncConfig", classOf[Path], + classOf[Map[_,_]]) + buildSyncConfigMethod.setAccessible(true) + + val hiveSyncConfig = buildSyncConfigMethod.invoke(HoodieSparkSqlWriter, + new Path(basePath), newParams).asInstanceOf[HiveSyncConfig] + + assertResult("spark.sql.sources.provider=hudi\n" + + "spark.sql.sources.schema.partCol.0=partition\n" + + "spark.sql.sources.schema.numParts=1\n" + + "spark.sql.sources.schema.numPartCols=1\n" + + "spark.sql.sources.schema.part.0=" + + "{\"type\":\"struct\",\"fields\":[{\"name\":\"_row_key\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}," + + "{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"partition\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}")(hiveSyncConfig.tableProperties) + + assertResult("path=/tmp/hoodie_test")(hiveSyncConfig.serdeProperties) + } + case class Test(uuid: String, ts: Long) import scala.collection.JavaConverters diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java index 140938f7046a1..786c7208513ba 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/DLASyncTool.java @@ -19,6 +19,7 @@ package org.apache.hudi.dla; import com.beust.jcommander.JCommander; +import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; @@ -149,14 +150,14 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi if (!useRealTimeInputFormat) { String inputFormatClassName = HoodieParquetInputFormat.class.getName(); hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(), - ParquetHiveSerDe.class.getName()); + ParquetHiveSerDe.class.getName(), new HashMap<>(), new HashMap<>()); } else { // Custom serde will not work with ALTER TABLE REPLACE COLUMNS // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive // /ql/exec/DDLTask.java#L3488 String inputFormatClassName = HoodieParquetRealtimeInputFormat.class.getName(); hoodieDLAClient.createTable(tableName, schema, inputFormatClassName, MapredParquetOutputFormat.class.getName(), - ParquetHiveSerDe.class.getName()); + ParquetHiveSerDe.class.getName(), new HashMap<>(), new HashMap<>()); } } else { // Check if the table schema has evolved diff --git a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java index 02c07d6e5861f..c5f1a7cf459e8 100644 --- a/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java +++ b/hudi-sync/hudi-dla-sync/src/main/java/org/apache/hudi/dla/HoodieDLAClient.java @@ -101,9 +101,12 @@ private void createDLAConnection() { } @Override - public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) { + public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, + String outputFormatClass, String serdeClass, + Map serdeProperties, Map tableProperties) { try { - String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, toHiveSyncConfig(), inputFormatClass, outputFormatClass, serdeClass); + String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, toHiveSyncConfig(), + inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties); LOG.info("Creating table with " + createSQLQuery); updateDLASQL(createSQLQuery); } catch (IOException e) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index 0063d15affd1d..e4e796295376e 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -88,6 +88,12 @@ public class HiveSyncConfig implements Serializable { @Parameter(names = {"--verify-metadata-file-listing"}, description = "Verify file listing from Hudi's metadata against file system") public Boolean verifyMetadataFileListing = HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE; + @Parameter(names = {"--table-properties"}, description = "Table properties to hive table") + public String tableProperties; + + @Parameter(names = {"--serde-properties"}, description = "Serde properties to hive table") + public String serdeProperties; + @Parameter(names = {"--help", "-h"}, help = true) public Boolean help = false; @@ -114,32 +120,36 @@ public static HiveSyncConfig copy(HiveSyncConfig cfg) { newConfig.verifyMetadataFileListing = cfg.verifyMetadataFileListing; newConfig.supportTimestamp = cfg.supportTimestamp; newConfig.decodePartition = cfg.decodePartition; + newConfig.tableProperties = cfg.tableProperties; + newConfig.serdeProperties = cfg.serdeProperties; return newConfig; } @Override public String toString() { return "HiveSyncConfig{" - + "databaseName='" + databaseName + '\'' - + ", tableName='" + tableName + '\'' - + ", baseFileFormat='" + baseFileFormat + '\'' - + ", hiveUser='" + hiveUser + '\'' - + ", hivePass='" + hivePass + '\'' - + ", jdbcUrl='" + jdbcUrl + '\'' - + ", basePath='" + basePath + '\'' - + ", partitionFields=" + partitionFields - + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\'' - + ", assumeDatePartitioning=" + assumeDatePartitioning - + ", usePreApacheInputFormat=" + usePreApacheInputFormat - + ", useJdbc=" + useJdbc - + ", autoCreateDatabase=" + autoCreateDatabase - + ", ignoreExceptions=" + ignoreExceptions - + ", skipROSuffix=" + skipROSuffix - + ", help=" + help - + ", supportTimestamp=" + supportTimestamp - + ", decodePartition=" + decodePartition - + ", useFileListingFromMetadata=" + useFileListingFromMetadata - + ", verifyMetadataFileListing=" + verifyMetadataFileListing - + '}'; + + "databaseName='" + databaseName + '\'' + + ", tableName='" + tableName + '\'' + + ", baseFileFormat='" + baseFileFormat + '\'' + + ", hiveUser='" + hiveUser + '\'' + + ", hivePass='" + hivePass + '\'' + + ", jdbcUrl='" + jdbcUrl + '\'' + + ", basePath='" + basePath + '\'' + + ", partitionFields=" + partitionFields + + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\'' + + ", assumeDatePartitioning=" + assumeDatePartitioning + + ", usePreApacheInputFormat=" + usePreApacheInputFormat + + ", useJdbc=" + useJdbc + + ", autoCreateDatabase=" + autoCreateDatabase + + ", ignoreExceptions=" + ignoreExceptions + + ", skipROSuffix=" + skipROSuffix + + ", useFileListingFromMetadata=" + useFileListingFromMetadata + + ", verifyMetadataFileListing=" + verifyMetadataFileListing + + ", tableProperties='" + tableProperties + '\'' + + ", serdeProperties='" + serdeProperties + '\'' + + ", help=" + help + + ", supportTimestamp=" + supportTimestamp + + ", decodePartition=" + decodePartition + + '}'; } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index bbda97efd10aa..18d133b6a339a 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -24,6 +24,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.InvalidTableException; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; +import org.apache.hudi.hive.util.ConfigUtils; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; import org.apache.hudi.hive.util.HiveSchemaUtil; @@ -162,9 +163,9 @@ private void syncHoodieTable(String tableName, boolean useRealtimeInputFormat) { LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null")); List writtenPartitionsSince = hoodieHiveClient.getPartitionsWrittenToSince(lastCommitTimeSynced); LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size()); + // Sync the partitions if needed syncPartitions(tableName, writtenPartitionsSince); - hoodieHiveClient.updateLastCommitTimeSynced(tableName); LOG.info("Sync complete for " + tableName); } @@ -196,7 +197,8 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi // Custom serde will not work with ALTER TABLE REPLACE COLUMNS // https://github.com/apache/hive/blob/release-1.1.0/ql/src/java/org/apache/hadoop/hive // /ql/exec/DDLTask.java#L3488 - hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, outputFormatClassName, serDeFormatClassName); + hoodieHiveClient.createTable(tableName, schema, inputFormatClassName, + outputFormatClassName, serDeFormatClassName, ConfigUtils.toMap(cfg.serdeProperties), ConfigUtils.toMap(cfg.tableProperties)); } else { // Check if the table schema has evolved Map tableSchema = hoodieHiveClient.getTableSchema(tableName); @@ -204,6 +206,12 @@ private void syncSchema(String tableName, boolean tableExists, boolean useRealTi if (!schemaDiff.isEmpty()) { LOG.info("Schema difference found for " + tableName); hoodieHiveClient.updateTableDefinition(tableName, schema); + // Sync the table properties if the schema has changed + if (cfg.tableProperties != null) { + Map tableProperties = ConfigUtils.toMap(cfg.tableProperties); + hoodieHiveClient.updateTableProperties(tableName, tableProperties); + LOG.info("Sync table properties for " + tableName + ", table properties is: " + cfg.tableProperties); + } } else { LOG.info("No Schema difference for " + tableName); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index b6211671b5119..aa7719aee2466 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -138,6 +138,26 @@ public void updatePartitionsToTable(String tableName, List changedPartit } } + /** + * Update the table properties to the table. + */ + @Override + public void updateTableProperties(String tableName, Map tableProperties) { + if (tableProperties == null || tableProperties.isEmpty()) { + return; + } + try { + Table table = client.getTable(syncConfig.databaseName, tableName); + for (Map.Entry entry: tableProperties.entrySet()) { + table.putToParameters(entry.getKey(), entry.getValue()); + } + client.alter_table(syncConfig.databaseName, tableName, table); + } catch (Exception e) { + throw new HoodieHiveSyncException("Failed to update table properties for table: " + + tableName, e); + } + } + private String constructAddPartitions(String tableName, List partitions) { StringBuilder alterSQL = new StringBuilder("ALTER TABLE "); alterSQL.append(HIVE_ESCAPE_CHARACTER).append(syncConfig.databaseName) @@ -255,10 +275,13 @@ void updateTableDefinition(String tableName, MessageType newSchema) { } @Override - public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, String outputFormatClass, String serdeClass) { + public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, + String outputFormatClass, String serdeClass, + Map serdeProperties, Map tableProperties) { try { String createSQLQuery = - HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, syncConfig, inputFormatClass, outputFormatClass, serdeClass); + HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, syncConfig, inputFormatClass, + outputFormatClass, serdeClass, serdeProperties, tableProperties); LOG.info("Creating table with " + createSQLQuery); updateHiveSQL(createSQLQuery); } catch (IOException e) { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java new file mode 100644 index 0000000000000..8c9dfb636b221 --- /dev/null +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/ConfigUtils.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hive.util; + +import java.util.HashMap; +import java.util.Map; +import org.apache.hudi.common.util.StringUtils; + +public class ConfigUtils { + + /** + * Convert the key-value config to a map.The format of the config + * is a key-value pair just like "k1=v1\nk2=v2\nk3=v3". + * @param keyValueConfig + * @return + */ + public static Map toMap(String keyValueConfig) { + if (StringUtils.isNullOrEmpty(keyValueConfig)) { + return new HashMap<>(); + } + String[] keyvalues = keyValueConfig.split("\n"); + Map tableProperties = new HashMap<>(); + for (String keyValue : keyvalues) { + String[] keyValueArray = keyValue.split("="); + if (keyValueArray.length == 1 || keyValueArray.length == 2) { + String key = keyValueArray[0].trim(); + String value = keyValueArray.length == 2 ? keyValueArray[1].trim() : ""; + tableProperties.put(key, value); + } else { + throw new IllegalArgumentException("Bad key-value config: " + keyValue + ", must be the" + + " format 'key = value'"); + } + } + return tableProperties; + } + + /** + * Convert map config to key-value string.The format of the config + * is a key-value pair just like "k1=v1\nk2=v2\nk3=v3". + * @param config + * @return + */ + public static String configToString(Map config) { + if (config == null) { + return null; + } + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : config.entrySet()) { + if (sb.length() > 0) { + sb.append("\n"); + } + sb.append(entry.getKey()).append("=").append(entry.getValue()); + } + return sb.toString(); + } + +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java index 6a209bef7c3ec..d4cdfc09cc81d 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/HiveSchemaUtil.java @@ -395,7 +395,8 @@ public static String generateSchemaString(MessageType storageSchema, List serdeProperties, + Map tableProperties) throws IOException { Map hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, config.supportTimestamp); String columns = generateSchemaString(storageSchema, config.partitionFields, config.supportTimestamp); @@ -415,8 +416,31 @@ public static String generateCreateDDL(String tableName, MessageType storageSche sb.append(" PARTITIONED BY (").append(partitionsStr).append(")"); } sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'"); + if (serdeProperties != null && !serdeProperties.isEmpty()) { + sb.append(" WITH SERDEPROPERTIES (").append(propertyToString(serdeProperties)).append(")"); + } sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'"); sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.basePath).append("'"); + + if (tableProperties != null && !tableProperties.isEmpty()) { + sb.append(" TBLPROPERTIES(").append(propertyToString(tableProperties)).append(")"); + } + return sb.toString(); + } + + private static String propertyToString(Map properties) { + if (properties == null || properties.isEmpty()) { + return ""; + } + StringBuilder sb = new StringBuilder(); + boolean first = true; + for (Map.Entry entry: properties.entrySet()) { + if (!first) { + sb.append(","); + } + sb.append("'").append(entry.getKey()).append("'='").append(entry.getValue()).append("'"); + first = false; + } return sb.toString(); } diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index 75ba97c835897..300e9378a75f1 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -18,13 +18,19 @@ package org.apache.hudi.hive; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.Option; -import org.apache.hudi.hive.testutils.HiveTestUtil; -import org.apache.hudi.hive.util.HiveSchemaUtil; +import org.apache.hudi.hive.util.ConfigUtils; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent; import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType; +import org.apache.hudi.hive.testutils.HiveTestUtil; +import org.apache.hudi.hive.util.HiveSchemaUtil; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.parquet.schema.MessageType; @@ -250,6 +256,54 @@ public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) "The last commit that was synced should be 100"); } + @ParameterizedTest + @MethodSource({"useJdbcAndSchemaFromCommitMetadata"}) + public void testSyncWithProperties(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception { + HiveSyncConfig hiveSyncConfig = HiveTestUtil.hiveSyncConfig; + Map serdeProperties = new HashMap() { + { + put("path", hiveSyncConfig.basePath); + } + }; + + Map tableProperties = new HashMap() { + { + put("tp_0", "p0"); + put("tp_1", "p1"); + } + }; + hiveSyncConfig.useJdbc = useJdbc; + hiveSyncConfig.serdeProperties = ConfigUtils.configToString(serdeProperties); + hiveSyncConfig.tableProperties = ConfigUtils.configToString(tableProperties); + String instantTime = "100"; + HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata); + + HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem); + tool.syncHoodieTable(); + + SessionState.start(HiveTestUtil.getHiveConf()); + Driver hiveDriver = new org.apache.hadoop.hive.ql.Driver(HiveTestUtil.getHiveConf()); + String dbTableName = hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName; + hiveDriver.run("SHOW TBLPROPERTIES " + dbTableName); + List results = new ArrayList<>(); + hiveDriver.getResults(results); + + String tblPropertiesWithoutDdlTime = String.join("\n", + results.subList(0, results.size() - 1)); + assertEquals( + "EXTERNAL\tTRUE\n" + + "last_commit_time_sync\t100\n" + + "tp_0\tp0\n" + + "tp_1\tp1", tblPropertiesWithoutDdlTime); + assertTrue(results.get(results.size() - 1).startsWith("transient_lastDdlTime")); + + results.clear(); + hiveDriver.run("SHOW CREATE TABLE " + dbTableName); + hiveDriver.getResults(results); + String ddl = String.join("\n", results); + assertTrue(ddl.contains("'path'='" + hiveSyncConfig.basePath + "'")); + } + @ParameterizedTest @MethodSource("useJdbc") public void testSyncIncremental(boolean useJdbc) throws Exception { diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java index 8477ed6cec222..f9ada2fa829a8 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/AbstractSyncHoodieClient.java @@ -62,8 +62,20 @@ public AbstractSyncHoodieClient(String basePath, boolean assumeDatePartitioning, this.fs = fs; } + /** + * Create the table. + * @param tableName The table name. + * @param storageSchema The table schema. + * @param inputFormatClass The input format class of this table. + * @param outputFormatClass The output format class of this table. + * @param serdeClass The serde class of this table. + * @param serdeProperties The serde properites of this table. + * @param tableProperties The table properties for this table. + */ public abstract void createTable(String tableName, MessageType storageSchema, - String inputFormatClass, String outputFormatClass, String serdeClass); + String inputFormatClass, String outputFormatClass, + String serdeClass, Map serdeProperties, + Map tableProperties); public abstract boolean doesTableExist(String tableName); @@ -75,6 +87,8 @@ public abstract void createTable(String tableName, MessageType storageSchema, public abstract void updatePartitionsToTable(String tableName, List changedPartitions); + public void updateTableProperties(String tableName, Map tableProperties) {} + public abstract Map getTableSchema(String tableName); public HoodieTableType getTableType() { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 8d837fd0972be..f9d016214844c 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -236,6 +236,13 @@ public static void initClass() throws Exception { prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT); } + @AfterAll + public static void release() { + if (testUtils != null) { + testUtils.teardown(); + } + } + private static void populateInvalidTableConfigFilePathProps(TypedProperties props) { props.setProperty("hoodie.datasource.write.keygenerator.class", TestHoodieDeltaStreamer.TestGenerator.class.getName()); props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyyMMdd");