Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.hudi.DataSourceWriteOptions;
import org.apache.spark.sql.SaveMode;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.HoodieDataSourceHelpers;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hadoop.fs.FileSystem;

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
Expand Down Expand Up @@ -49,6 +50,8 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY, "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "datestr").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, "true").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor");

Expand Down Expand Up @@ -79,6 +82,8 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(DataSourceWriteOptions.HIVE_PASS_OPT_KEY, "hive").
option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true").
option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "datestr").
option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName).
option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, "true").
mode(SaveMode.Overwrite).
save("/user/hive/warehouse/stock_ticks_derived_mor_bs");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,8 @@ private[hudi] object HoodieSparkSqlWriter {
hiveSyncConfig.partitionValueExtractorClass = parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
hiveSyncConfig.supportTimestamp = parameters.get(HIVE_SUPPORT_TIMESTAMP).exists(r => r.toBoolean)
hiveSyncConfig.decodePartition = parameters.getOrElse(URL_ENCODE_PARTITIONING_OPT_KEY,
DEFAULT_URL_ENCODE_PARTITIONING_OPT_VAL).toBoolean
hiveSyncConfig
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ public class HiveSyncConfig implements Serializable {
+ "Disabled by default for backward compatibility.")
public Boolean supportTimestamp = false;

@Parameter(names = {"--decode-partition"}, description = "Decode the partition value if the partition has encoded during writing")
public Boolean decodePartition = false;

public static HiveSyncConfig copy(HiveSyncConfig cfg) {
HiveSyncConfig newConfig = new HiveSyncConfig();
newConfig.basePath = cfg.basePath;
Expand All @@ -97,15 +100,30 @@ public static HiveSyncConfig copy(HiveSyncConfig cfg) {
newConfig.tableName = cfg.tableName;
newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
newConfig.supportTimestamp = cfg.supportTimestamp;
newConfig.decodePartition = cfg.decodePartition;
return newConfig;
}

@Override
public String toString() {
return "HiveSyncConfig{databaseName='" + databaseName + '\'' + ", tableName='" + tableName + '\''
+ ", hiveUser='" + hiveUser + '\'' + ", hivePass='" + hivePass + '\'' + ", jdbcUrl='" + jdbcUrl + '\''
+ ", basePath='" + basePath + '\'' + ", partitionFields=" + partitionFields + ", partitionValueExtractorClass='"
+ partitionValueExtractorClass + '\'' + ", assumeDatePartitioning=" + assumeDatePartitioning + '\'' + ", supportTimestamp='" + supportTimestamp + '\''
+ ", usePreApacheInputFormat=" + usePreApacheInputFormat + ", useJdbc=" + useJdbc + ", help=" + help + '}';
return "HiveSyncConfig{"
+ "databaseName='" + databaseName + '\''
+ ", tableName='" + tableName + '\''
+ ", baseFileFormat='" + baseFileFormat + '\''
+ ", hiveUser='" + hiveUser + '\''
+ ", hivePass='" + hivePass + '\''
+ ", jdbcUrl='" + jdbcUrl + '\''
+ ", basePath='" + basePath + '\''
+ ", partitionFields=" + partitionFields
+ ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
+ ", assumeDatePartitioning=" + assumeDatePartitioning
+ ", usePreApacheInputFormat=" + usePreApacheInputFormat
+ ", useJdbc=" + useJdbc
+ ", autoCreateDatabase=" + autoCreateDatabase
+ ", skipROSuffix=" + skipROSuffix
+ ", help=" + help
+ ", supportTimestamp=" + supportTimestamp
+ ", decodePartition=" + decodePartition
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hudi.hive;

import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
Expand Down Expand Up @@ -162,7 +165,17 @@ private String getPartitionClause(String partition) {
+ ". Check partition strategy. ");
List<String> partBuilder = new ArrayList<>();
for (int i = 0; i < syncConfig.partitionFields.size(); i++) {
partBuilder.add("`" + syncConfig.partitionFields.get(i) + "`='" + partitionValues.get(i) + "'");
String partitionValue = partitionValues.get(i);
// decode the partition before sync to hive to prevent multiple escapes of HIVE
if (syncConfig.decodePartition) {
try {
// This is a decode operator for encode in KeyGenUtils#getRecordPartitionPath
partitionValue = URLDecoder.decode(partitionValue, StandardCharsets.UTF_8.toString());
} catch (UnsupportedEncodingException e) {
throw new HoodieHiveSyncException("error in decode partition: " + partitionValue, e);
}
}
partBuilder.add("`" + syncConfig.partitionFields.get(i) + "`='" + partitionValue + "'");
}
return String.join(",", partBuilder);
}
Expand Down