diff --git a/docker/demo/config/hoodie-incr.properties b/docker/demo/config/hoodie-incr.properties index 80f474b1e7716..c46ec48a40184 100644 --- a/docker/demo/config/hoodie-incr.properties +++ b/docker/demo/config/hoodie-incr.properties @@ -28,5 +28,6 @@ hoodie.deltastreamer.source.hoodieincr.path=/docker_hoodie_sync_valid_test hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true # hive sync hoodie.datasource.hive_sync.table=docker_hoodie_sync_valid_test_2 -hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 -hoodie.datasource.hive_sync.partition_fields=partition \ No newline at end of file +hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ +hoodie.datasource.hive_sync.partition_fields=partition +hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java index f142ebd502f15..36000e9cb86e8 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java @@ -92,7 +92,8 @@ public class ITTestHoodieDemo extends ITTestBase { private HoodieFileFormat baseFileFormat; private static String HIVE_SYNC_CMD_FMT = - " --enable-hive-sync --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 " + " --enable-hive-sync --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ " + + " --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor " + " --hoodie-conf hoodie.datasource.hive_sync.username=hive " + " --hoodie-conf hoodie.datasource.hive_sync.password=hive " + " --hoodie-conf hoodie.datasource.hive_sync.partition_fields=%s " @@ -215,6 +216,7 @@ private void ingestFirstBatchAndHiveSync() throws Exception { + " --user hive" + " --pass hive" + " --jdbc-url jdbc:hive2://hiveserver:10000" + + " --partition-value-extractor org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor" + " --partitioned-by dt", ("spark-submit" + " --conf \'spark.executor.extraJavaOptions=-Dlog4jspark.root.logger=WARN,console\'" diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index b763416e8f255..0374686b7166b 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -91,7 +91,7 @@ public HiveSyncTool(Properties props, Configuration hadoopConf) { HiveSyncConfig config = new HiveSyncConfig(props, hadoopConf); this.config = config; this.databaseName = config.getStringOrDefault(META_SYNC_DATABASE_NAME); - this.tableName = config.getString(META_SYNC_TABLE_NAME); + this.tableName = config.getStringOrDefault(META_SYNC_TABLE_NAME); initSyncClient(config); initTableNameVars(config); } @@ -109,6 +109,7 @@ protected void initSyncClient(HiveSyncConfig config) { } private void initTableNameVars(HiveSyncConfig config) { + final String tableName = config.getStringOrDefault(META_SYNC_TABLE_NAME); if (syncClient != null) { switch (syncClient.getTableType()) { case COPY_ON_WRITE: diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index 33f3064631f1a..f3c8b3da5e380 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -47,6 +47,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HiveSyncTool; +import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor; import org.apache.hudi.hive.ddl.HiveQueryDDLExecutor; import org.apache.hudi.hive.ddl.QueryBasedDDLExecutor; @@ -92,6 +93,7 @@ import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME; +import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; import static org.junit.jupiter.api.Assertions.fail; @@ -138,6 +140,7 @@ public static void setUp() throws IOException, InterruptedException, HiveExcepti hiveSyncProps.setProperty(META_SYNC_ASSUME_DATE_PARTITION.key(), "true"); hiveSyncProps.setProperty(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), "false"); hiveSyncProps.setProperty(META_SYNC_PARTITION_FIELDS.key(), "datestr"); + hiveSyncProps.setProperty(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), SlashEncodedDayPartitionValueExtractor.class.getName()); hiveSyncProps.setProperty(HIVE_BATCH_SYNC_PARTITION_NUM.key(), "3"); hiveSyncConfig = new HiveSyncConfig(hiveSyncProps, hiveTestService.getHiveConf()); diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java index d38cbc9524ef8..b2df64133e965 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncConfig.java @@ -32,9 +32,13 @@ import com.beust.jcommander.Parameter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import java.util.Comparator; import java.util.List; import java.util.Properties; +import java.util.stream.Collectors; import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS; import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME; @@ -46,6 +50,8 @@ */ public class HoodieSyncConfig extends HoodieConfig { + private static final Logger LOG = LogManager.getLogger(HoodieSyncConfig.class); + public static final ConfigProperty META_SYNC_BASE_PATH = ConfigProperty .key("hoodie.datasource.meta.sync.base.path") .defaultValue("") @@ -84,7 +90,7 @@ public class HoodieSyncConfig extends HoodieConfig { public static final ConfigProperty META_SYNC_PARTITION_EXTRACTOR_CLASS = ConfigProperty .key("hoodie.datasource.hive_sync.partition_extractor_class") - .defaultValue("org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor") + .defaultValue("org.apache.hudi.hive.MultiPartKeysValueExtractor") .withInferFunction(cfg -> { if (StringUtils.nonEmpty(cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME))) { int numOfPartFields = cfg.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME).split(",").length; @@ -138,7 +144,12 @@ public HoodieSyncConfig(Properties props) { public HoodieSyncConfig(Properties props, Configuration hadoopConf) { super(props); - setDefaults(getClass().getName()); + LOG.debug("Passed in properties:\n" + props.entrySet() + .stream() + .sorted(Comparator.comparing(e -> e.getKey().toString())) + .map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.joining("\n"))); + setDefaults(HoodieSyncConfig.class.getName()); this.hadoopConf = hadoopConf; } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index badd3ab6275c7..f3d9af3150706 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -705,6 +705,7 @@ public void runMetaSync() { TypedProperties metaProps = new TypedProperties(); metaProps.putAll(props); + metaProps.putAll(writeClient.getConfig().getProps()); if (props.getBoolean(HIVE_SYNC_BUCKET_SYNC.key(), HIVE_SYNC_BUCKET_SYNC.defaultValue())) { metaProps.put(HIVE_SYNC_BUCKET_SYNC_SPEC.key(), HiveSyncConfig.getBucketSpec(props.getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD.key()), props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));