diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index f1e930b126f41..c93907c4a33bf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -133,7 +133,7 @@ public class HoodieClusteringConfig extends HoodieConfig { public static final ConfigProperty PRESERVE_COMMIT_METADATA = ConfigProperty .key("hoodie.clustering.preserve.commit.metadata") - .defaultValue(false) + .defaultValue(true) .sinceVersion("0.9.0") .withDocumentation("When rewriting data, preserves existing hoodie_commit_time"); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 53d68c3232a7f..6632dce86d953 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -323,9 +323,7 @@ public Pair>> readFromSource( // Retrieve the previous round checkpoints, if any Option resumeCheckpointStr = Option.empty(); if (commitTimelineOpt.isPresent()) { - // TODO: now not support replace action HUDI-1500 - Option lastCommit = commitTimelineOpt.get() - .filter(instant -> !instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).lastInstant(); + Option lastCommit = commitTimelineOpt.get().lastInstant(); if (lastCommit.isPresent()) { HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), HoodieCommitMetadata.class); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 58b665cc83780..bf826daf9fe65 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -963,8 +963,9 @@ private void deltaStreamerTestRunner(HoodieDeltaStreamer ds, Function { TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs); @@ -1064,6 +1065,13 @@ public void testCleanerDeleteReplacedDataWithArchive(Boolean asyncClean) throws } } + private List getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, String inlineClusterMaxCommit, + String asyncCluster, String asyncClusterMaxCommit, String preserveCommitMetadata) { + List configs = getAsyncServicesConfigs(totalRecords, autoClean, inlineCluster, inlineClusterMaxCommit, asyncCluster, asyncClusterMaxCommit); + configs.add(String.format("%s=%s", HoodieClusteringConfig.PRESERVE_COMMIT_METADATA.key(), preserveCommitMetadata)); + return configs; + } + private List getAsyncServicesConfigs(int totalRecords, String autoClean, String inlineCluster, String inlineClusterMaxCommit, String asyncCluster, String asyncClusterMaxCommit) { List configs = new ArrayList<>(); @@ -1156,8 +1164,9 @@ public void testAsyncClusteringService() throws Exception { }); } - @Test - public void testAsyncClusteringServiceWithCompaction() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"true", "false"}) + public void testAsyncClusteringServiceWithCompaction(String preserveCommitMetadata) throws Exception { String tableBasePath = dfsBasePath + "/asyncClusteringCompaction"; // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; @@ -1166,7 +1175,7 @@ public void testAsyncClusteringServiceWithCompaction() throws Exception { HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT); cfg.continuousMode = true; cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); - cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2")); + cfg.configs.addAll(getAsyncServicesConfigs(totalRecords, "false", "", "", "true", "2", preserveCommitMetadata)); HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); deltaStreamerTestRunner(ds, cfg, (r) -> { TestHelpers.assertAtLeastNCommits(2, tableBasePath, dfs);