diff --git a/docker/demo/sparksql-incremental.commands b/docker/demo/sparksql-incremental.commands index febfcd28a1116..6409cc747a4be 100644 --- a/docker/demo/sparksql-incremental.commands +++ b/docker/demo/sparksql-incremental.commands @@ -21,6 +21,7 @@ import org.apache.hudi.DataSourceWriteOptions; import org.apache.spark.sql.SaveMode; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.HoodieDataSourceHelpers; +import org.apache.hudi.hive.MultiPartKeysValueExtractor; import org.apache.hadoop.fs.FileSystem; val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) @@ -49,6 +50,8 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY, "hive"). option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true"). option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "datestr"). + option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName). + option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, "true"). mode(SaveMode.Overwrite). save("/user/hive/warehouse/stock_ticks_derived_mor"); @@ -79,6 +82,8 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY, "hive"). option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true"). option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "datestr"). + option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName). + option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, "true"). mode(SaveMode.Overwrite). save("/user/hive/warehouse/stock_ticks_derived_mor_bs"); diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index d66103600f9a1..b7fcf819a2db2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -364,6 +364,8 @@ private[hudi] object HoodieSparkSqlWriter { hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY) hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean hiveSyncConfig.supportTimestamp = parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean) + hiveSyncConfig.decodePartition = parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY, + DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL).toBoolean hiveSyncConfig } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java index fa223980d663e..6c8fd8f916483 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java @@ -84,6 +84,9 @@ public class HiveSyncConfig implements Serializable { + "Disabled by default for backward compatibility.") public Boolean supportTimestamp = false; + @Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing") + public Boolean decodePartition = false; + public static HiveSyncConfig copy(HiveSyncConfig cfg) { HiveSyncConfig newConfig = new HiveSyncConfig(); newConfig.basePath = cfg.basePath; @@ -97,15 +100,30 @@ public static HiveSyncConfig copy(HiveSyncConfig cfg) { newConfig.tableName = cfg.tableName; newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat; newConfig.supportTimestamp = cfg.supportTimestamp; + newConfig.decodePartition = cfg.decodePartition; return newConfig; } @Override public String toString() { - return "HiveSyncConfig{databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\'' - + ", hiveUser='" + hiveUser + '\'' + ", hivePass='" + hivePass + '\'' + ", jdbcUrl='" + jdbcUrl + '\'' - + ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='" - + partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning + '\'' + ", supportTimestamp='" + supportTimestamp + '\'' - + ", usePreApacheInputFormat=" + usePreApacheInputFormat + ", useJdbc=" + useJdbc + ", help=" + help + '}'; + return "HiveSyncConfig{" + + "databaseName='" + databaseName + '\'' + + ", tableName='" + tableName + '\'' + + ", baseFileFormat='" + baseFileFormat + '\'' + + ", hiveUser='" + hiveUser + '\'' + + ", hivePass='" + hivePass + '\'' + + ", jdbcUrl='" + jdbcUrl + '\'' + + ", basePath='" + basePath + '\'' + + ", partitionFields=" + partitionFields + + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\'' + + ", assumeDatePartitioning=" + assumeDatePartitioning + + ", usePreApacheInputFormat=" + usePreApacheInputFormat + + ", useJdbc=" + useJdbc + + ", autoCreateDatabase=" + autoCreateDatabase + + ", skipROSuffix=" + skipROSuffix + + ", help=" + help + + ", supportTimestamp=" + supportTimestamp + + ", decodePartition=" + decodePartition + + '}'; } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java index 6d85395d3f6e5..88f4c10611b08 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java @@ -18,6 +18,9 @@ package org.apache.hudi.hive; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; @@ -162,7 +165,17 @@ private String getPartitionClause(String partition) { + ". Check partition strategy. "); List partBuilder = new ArrayList<>(); for (int i = 0; i < syncConfig.partitionFields.size(); i++) { - partBuilder.add("`" + syncConfig.partitionFields.get(i) + "`='" + partitionValues.get(i) + "'"); + String partitionValue = partitionValues.get(i); + // decode the partition before sync to hive to prevent multiple escapes of HIVE + if (syncConfig.decodePartition) { + try { + // This is a decode operator for encode in KeyGenUtils#getRecordPartitionPath + partitionValue = URLDecoder.decode(partitionValue, StandardCharsets.UTF_8.toString()); + } catch (UnsupportedEncodingException e) { + throw new HoodieHiveSyncException("error in decode partition: " + partitionValue, e); + } + } + partBuilder.add("`" + syncConfig.partitionFields.get(i) + "`='" + partitionValue + "'"); } return String.join(",", partBuilder); }