From 7a9f87cb94043c2447da84ff07ff93009c891174 Mon Sep 17 00:00:00 2001 From: dongkelun Date: Sun, 4 Sep 2022 16:30:57 +0800 Subject: [PATCH 1/2] [HUDI-3998] Fix getCommitsSinceLastCleaning failed when async cleaning --- .../action/clean/CleanActionExecutor.java | 1 + .../action/clean/CleanPlanActionExecutor.java | 17 +++++--- .../hudi/table/action/clean/CleanPlanner.java | 11 ++++++ .../utils/TestMetadataConversionUtils.java | 7 ++-- .../org/apache/hudi/table/TestCleaner.java | 39 ++++++++++++------- .../testutils/HoodieClientTestHarness.java | 7 ++-- .../src/main/avro/HoodieCleanMetadata.avsc | 1 + .../src/main/avro/HoodieCleanerPlan.avsc | 5 +++ .../apache/hudi/common/HoodieCleanStat.java | 26 ++++++++++--- .../CleanMetadataV1MigrationHandler.java | 1 + .../CleanMetadataV2MigrationHandler.java | 7 +--- .../clean/CleanPlanV1MigrationHandler.java | 10 ++--- .../clean/CleanPlanV2MigrationHandler.java | 4 +- .../apache/hudi/common/util/CleanerUtils.java | 6 ++- .../hudi/common/table/TestTimelineUtils.java | 5 ++- .../table/view/TestIncrementalFSViewSync.java | 4 +- .../common/testutils/FileCreateUtils.java | 14 ++++--- .../common/testutils/HoodieTestTable.java | 13 ++++--- .../hudi/timeline/service/RequestHandler.java | 35 +++++++++++++---- 19 files changed, 142 insertions(+), 71 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java index 22ad41160fa31..56b01ec77b62b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java @@ -167,6 +167,7 @@ List clean(HoodieEngineContext context, HoodieCleanerPlan clean ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()), actionInstant.getAction(), actionInstant.getTimestamp()) : null)) + .withLastCompletedCommitTimestamp(cleanerPlan.getLastCompletedCommitTimestamp()) .withDeletePathPattern(partitionCleanStat.deletePathPatterns()) .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles()) .withFailedDeletes(partitionCleanStat.failedDeleteFiles()) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index d8e51bcd1643e..7f3b437178fd4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.model.HoodieActionInstant; import org.apache.hudi.avro.model.HoodieCleanFileInfo; +import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.CleanFileInfo; @@ -64,11 +65,16 @@ private int getCommitsSinceLastCleaning() { Option lastCleanInstant = table.getActiveTimeline().getCleanerTimeline().filterCompletedInstants().lastInstant(); HoodieTimeline commitTimeline = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - String latestCleanTs; - int numCommits = 0; - if (lastCleanInstant.isPresent()) { - latestCleanTs = lastCleanInstant.get().getTimestamp(); - numCommits = commitTimeline.findInstantsAfter(latestCleanTs).countInstants(); + int numCommits; + if (lastCleanInstant.isPresent() && !table.getActiveTimeline().isEmpty(lastCleanInstant.get())) { + try { + HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils + .deserializeHoodieCleanMetadata(table.getActiveTimeline().getInstantDetails(lastCleanInstant.get()).get()); + String lastCompletedCommitTimestamp = cleanMetadata.getLastCompletedCommitTimestamp(); + numCommits = commitTimeline.findInstantsAfter(lastCompletedCommitTimestamp).countInstants(); + } catch (IOException e) { + throw new HoodieIOException("Parsing of last clean instant " + lastCleanInstant.get() + " failed", e); + } } else { numCommits = commitTimeline.countInstants(); } @@ -123,6 +129,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { return new HoodieCleanerPlan(earliestInstant .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null), + planner.getLastCompletedCommitTimestamp(), config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(), CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete); } catch (IOException e) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 48d510c545b0e..529fa914fcd6c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -475,6 +475,17 @@ public Option getEarliestCommitToRetain() { return earliestCommitToRetain; } + /** + * Returns the last completed commit timestamp before clean. + */ + public String getLastCompletedCommitTimestamp() { + if (commitTimeline.lastInstant().isPresent()) { + return commitTimeline.lastInstant().get().getTimestamp(); + } else { + return ""; + } + } + /** * Determine if file slice needed to be preserved for pending compaction. * diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java index 9861506909980..1bbe10db0f557 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java @@ -258,15 +258,16 @@ private void createReplace(String instantTime, WriteOperationType writeOperation } private void createCleanMetadata(String instantTime) throws IOException { - HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), - CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>()); + HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), + "", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>()); HoodieCleanStat cleanStats = new HoodieCleanStat( HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)], Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), - instantTime); + instantTime, + ""); HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats)); HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java index 8da877940b36b..c12b3bd294e76 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java @@ -262,7 +262,6 @@ public void testMultiClean() { HoodieWriteConfig writeConfig = getConfigBuilder() .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder() .withEnableBackupForRemoteFileSystemView(false).build()) - .withCleanConfig(HoodieCleanConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER) .allowMultipleCleans(false) @@ -455,9 +454,10 @@ private void testInsertAndCleanByVersions( /** * Test Clean-By-Commits using insert/upsert API. */ - @Test - public void testInsertAndCleanByCommits() throws Exception { - testInsertAndCleanByCommits(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testInsertAndCleanByCommits(boolean isAsync) throws Exception { + testInsertAndCleanByCommits(SparkRDDWriteClient::insert, SparkRDDWriteClient::upsert, false, isAsync); } /** @@ -473,7 +473,8 @@ public void testFailedInsertAndCleanByCommits() throws Exception { */ @Test public void testInsertPreppedAndCleanByCommits() throws Exception { - testInsertAndCleanByCommits(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords, true); + testInsertAndCleanByCommits(SparkRDDWriteClient::insertPreppedRecords, SparkRDDWriteClient::upsertPreppedRecords, + true, false); } /** @@ -483,7 +484,7 @@ public void testInsertPreppedAndCleanByCommits() throws Exception { public void testBulkInsertPreppedAndCleanByCommits() throws Exception { testInsertAndCleanByCommits( (client, recordRDD, instantTime) -> client.bulkInsertPreppedRecords(recordRDD, instantTime, Option.empty()), - SparkRDDWriteClient::upsertPreppedRecords, true); + SparkRDDWriteClient::upsertPreppedRecords, true, false); } /** @@ -491,7 +492,7 @@ public void testBulkInsertPreppedAndCleanByCommits() throws Exception { */ @Test public void testBulkInsertAndCleanByCommits() throws Exception { - testInsertAndCleanByCommits(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false); + testInsertAndCleanByCommits(SparkRDDWriteClient::bulkInsert, SparkRDDWriteClient::upsert, false, false); } /** @@ -505,12 +506,12 @@ public void testBulkInsertAndCleanByCommits() throws Exception { */ private void testInsertAndCleanByCommits( Function3, SparkRDDWriteClient, JavaRDD, String> insertFn, - Function3, SparkRDDWriteClient, JavaRDD, String> upsertFn, boolean isPreppedAPI) + Function3, SparkRDDWriteClient, JavaRDD, String> upsertFn, boolean isPreppedAPI, boolean isAsync) throws Exception { int maxCommits = 3; // keep upto 3 commits from the past HoodieWriteConfig cfg = getConfigBuilder() .withCleanConfig(HoodieCleanConfig.newBuilder() - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(maxCommits).build()) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(maxCommits).build()) .withParallelism(1, 1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1) .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build()) .build(); @@ -539,6 +540,10 @@ private void testInsertAndCleanByCommits( metaClient = HoodieTableMetaClient.reload(metaClient); HoodieTable table1 = HoodieSparkTable.create(cfg, context, metaClient); HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline(); + HoodieInstant lastInstant = activeTimeline.lastInstant().get(); + if (cfg.isAsyncClean()) { + activeTimeline = activeTimeline.findInstantsBefore(lastInstant.getTimestamp()); + } // NOTE: See CleanPlanner#getFilesToCleanKeepingLatestCommits. We explicitly keep one commit before earliest // commit Option earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits); @@ -560,6 +565,9 @@ private void testInsertAndCleanByCommits( LOG.debug("Data File - " + value); commitTimes.add(value.getCommitTime()); }); + if (cfg.isAsyncClean()) { + commitTimes.remove(lastInstant.getTimestamp()); + } assertEquals(acceptableCommits.stream().map(HoodieInstant::getTimestamp).collect(Collectors.toSet()), commitTimes, "Only contain acceptable versions of file should be present"); } @@ -677,7 +685,7 @@ protected List runCleaner( String dirPath = metaClient.getBasePath() + "/" + p.getPartitionPath(); p.getSuccessDeleteFiles().forEach(p2 -> { try { - metaClient.getFs().create(new Path(dirPath, p2), true); + metaClient.getFs().create(new Path(dirPath, p2), true).close(); } catch (IOException e) { throw new HoodieIOException(e.getMessage(), e); } @@ -941,7 +949,7 @@ public void testCleanMetadataUpgradeDowngrade() { // create partition1 clean stat. HoodieCleanStat cleanStat1 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, partition1, deletePathPatterns1, successDeleteFiles1, - failedDeleteFiles1, instantTime); + failedDeleteFiles1, instantTime, ""); List deletePathPatterns2 = new ArrayList<>(); List successDeleteFiles2 = new ArrayList<>(); @@ -950,7 +958,7 @@ public void testCleanMetadataUpgradeDowngrade() { // create partition2 empty clean stat. HoodieCleanStat cleanStat2 = new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, partition2, deletePathPatterns2, successDeleteFiles2, - failedDeleteFiles2, instantTime); + failedDeleteFiles2, instantTime, ""); // map with absolute file path. Map oldExpected = new HashMap<>(); @@ -1167,12 +1175,13 @@ public void testCleaningWithZeroPartitionPaths() throws Exception { /** * Test Keep Latest Commits when there are pending compactions. */ - @Test - public void testKeepLatestCommitsWithPendingCompactions() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testKeepLatestCommitsWithPendingCompactions(boolean isAsync) throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) .withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build()) .withCleanConfig(HoodieCleanConfig.newBuilder() - .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build()) + .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).withAsyncClean(isAsync).retainCommits(2).build()) .build(); // Deletions: // . FileId Base Logs Total Retained Commits diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java index 80d185f62bc9c..f5d1289b19c6e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java @@ -728,8 +728,8 @@ public HoodieInstant createEmptyCleanMetadata(String instantTime, boolean inflig } public HoodieInstant createCleanMetadata(String instantTime, boolean inflightOnly, boolean isEmptyForAll, boolean isEmptyCompleted) throws IOException { - HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", new HashMap<>(), - CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>()); + HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant("", "", ""), "", "", + new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>()); if (inflightOnly) { HoodieTestTable.of(metaClient).addInflightClean(instantTime, cleanerPlan); } else { @@ -739,7 +739,8 @@ HoodieTestUtils.DEFAULT_PARTITION_PATHS[new Random().nextInt(HoodieTestUtils.DEF Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), - instantTime); + instantTime, + ""); HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats)); HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata, isEmptyForAll, isEmptyCompleted); } diff --git a/hudi-common/src/main/avro/HoodieCleanMetadata.avsc b/hudi-common/src/main/avro/HoodieCleanMetadata.avsc index c26b5a693b1c1..e51ecd0300cb0 100644 --- a/hudi-common/src/main/avro/HoodieCleanMetadata.avsc +++ b/hudi-common/src/main/avro/HoodieCleanMetadata.avsc @@ -23,6 +23,7 @@ {"name": "timeTakenInMillis", "type": "long"}, {"name": "totalFilesDeleted", "type": "int"}, {"name": "earliestCommitToRetain", "type": "string"}, + {"name": "lastCompletedCommitTimestamp", "type": "string", "default" : ""}, {"name": "partitionMetadata", "type": { "type" : "map", "values" : "HoodieCleanPartitionMetadata" } diff --git a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc index e4c8638c86e6f..42842c8be29e9 100644 --- a/hudi-common/src/main/avro/HoodieCleanerPlan.avsc +++ b/hudi-common/src/main/avro/HoodieCleanerPlan.avsc @@ -42,6 +42,11 @@ }], "default" : null }, + { + "name": "lastCompletedCommitTimestamp", + "type": "string", + "default" : "" + }, { "name": "policy", "type": "string" diff --git a/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java b/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java index fa5d80419434b..0913a7440f020 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/HoodieCleanStat.java @@ -47,19 +47,22 @@ public class HoodieCleanStat implements Serializable { private final List failedDeleteBootstrapBaseFiles; // Earliest commit that was retained in this clean private final String earliestCommitToRetain; + // Last completed commit timestamp before clean + private final String lastCompletedCommitTimestamp; // set to true if partition is deleted private final boolean isPartitionDeleted; public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List deletePathPatterns, - List successDeleteFiles, List failedDeleteFiles, String earliestCommitToRetain) { + List successDeleteFiles, List failedDeleteFiles, String earliestCommitToRetain,String lastCompletedCommitTimestamp) { this(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles, earliestCommitToRetain, - CollectionUtils.createImmutableList(), CollectionUtils.createImmutableList(), + lastCompletedCommitTimestamp, CollectionUtils.createImmutableList(), CollectionUtils.createImmutableList(), CollectionUtils.createImmutableList(), false); } public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List deletePathPatterns, List successDeleteFiles, List failedDeleteFiles, - String earliestCommitToRetain, List deleteBootstrapBasePathPatterns, + String earliestCommitToRetain,String lastCompletedCommitTimestamp, + List deleteBootstrapBasePathPatterns, List successDeleteBootstrapBaseFiles, List failedDeleteBootstrapBaseFiles, boolean isPartitionDeleted) { @@ -69,6 +72,7 @@ public HoodieCleanStat(HoodieCleaningPolicy policy, String partitionPath, List failedDeleteFiles; private String partitionPath; private String earliestCommitToRetain; + private String lastCompletedCommitTimestamp; private List deleteBootstrapBasePathPatterns; private List successDeleteBootstrapBaseFiles; private List failedDeleteBootstrapBaseFiles; @@ -181,6 +190,11 @@ public Builder withEarliestCommitRetained(Option earliestCommitTo return this; } + public Builder withLastCompletedCommitTimestamp(String lastCompletedCommitTimestamp) { + this.lastCompletedCommitTimestamp = lastCompletedCommitTimestamp; + return this; + } + public Builder isPartitionDeleted(boolean isPartitionDeleted) { this.isPartitionDeleted = isPartitionDeleted; return this; @@ -188,8 +202,8 @@ public Builder isPartitionDeleted(boolean isPartitionDeleted) { public HoodieCleanStat build() { return new HoodieCleanStat(policy, partitionPath, deletePathPatterns, successDeleteFiles, failedDeleteFiles, - earliestCommitToRetain, deleteBootstrapBasePathPatterns, successDeleteBootstrapBaseFiles, - failedDeleteBootstrapBaseFiles, isPartitionDeleted); + earliestCommitToRetain, lastCompletedCommitTimestamp, deleteBootstrapBasePathPatterns, + successDeleteBootstrapBaseFiles, failedDeleteBootstrapBaseFiles, isPartitionDeleted); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV1MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV1MigrationHandler.java index 0b69894051303..ca1c5506e5e39 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV1MigrationHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV1MigrationHandler.java @@ -83,6 +83,7 @@ public HoodieCleanMetadata downgradeFrom(HoodieCleanMetadata input) { return HoodieCleanMetadata.newBuilder() .setEarliestCommitToRetain(input.getEarliestCommitToRetain()) + .setLastCompletedCommitTimestamp(input.getLastCompletedCommitTimestamp()) .setStartCleanTime(input.getStartCleanTime()) .setTimeTakenInMillis(input.getTimeTakenInMillis()) .setTotalFilesDeleted(input.getTotalFilesDeleted()) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV2MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV2MigrationHandler.java index d74dd888eca38..5eb47d2647619 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV2MigrationHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanMetadataV2MigrationHandler.java @@ -48,12 +48,6 @@ public Integer getManagedVersion() { public HoodieCleanMetadata upgradeFrom(HoodieCleanMetadata input) { ValidationUtils.checkArgument(input.getVersion() == 1, "Input version is " + input.getVersion() + ". Must be 1"); - HoodieCleanMetadata metadata = new HoodieCleanMetadata(); - metadata.setEarliestCommitToRetain(input.getEarliestCommitToRetain()); - metadata.setTimeTakenInMillis(input.getTimeTakenInMillis()); - metadata.setStartCleanTime(input.getStartCleanTime()); - metadata.setTotalFilesDeleted(input.getTotalFilesDeleted()); - metadata.setVersion(getManagedVersion()); Map partitionMetadataMap = input.getPartitionMetadata() .entrySet() @@ -80,6 +74,7 @@ public HoodieCleanMetadata upgradeFrom(HoodieCleanMetadata input) { return HoodieCleanMetadata.newBuilder() .setEarliestCommitToRetain(input.getEarliestCommitToRetain()) + .setLastCompletedCommitTimestamp(input.getLastCompletedCommitTimestamp()) .setStartCleanTime(input.getStartCleanTime()) .setTimeTakenInMillis(input.getTimeTakenInMillis()) .setTotalFilesDeleted(input.getTotalFilesDeleted()) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java index 66fdfeb62c207..5c8d9b8fb3e26 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV1MigrationHandler.java @@ -57,11 +57,9 @@ public HoodieCleanerPlan downgradeFrom(HoodieCleanerPlan plan) { "This version do not support METADATA_ONLY bootstrapped tables. Failed to downgrade."); } Map> filesPerPartition = plan.getFilePathsToBeDeletedPerPartition().entrySet().stream() - .map(e -> { - return Pair.of(e.getKey(), e.getValue().stream().map(v -> new Path(v.getFilePath()).getName()) - .collect(Collectors.toList())); - }).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), plan.getPolicy(), filesPerPartition, VERSION, - new HashMap<>(), new ArrayList<>()); + .map(e -> Pair.of(e.getKey(), e.getValue().stream().map(v -> new Path(v.getFilePath()).getName()) + .collect(Collectors.toList()))).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), plan.getLastCompletedCommitTimestamp(), + plan.getPolicy(), filesPerPartition, VERSION, new HashMap<>(), new ArrayList<>()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java index fd82109bd4529..c17af4020a3ca 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/clean/CleanPlanV2MigrationHandler.java @@ -53,8 +53,8 @@ public HoodieCleanerPlan upgradeFrom(HoodieCleanerPlan plan) { .map(v -> new HoodieCleanFileInfo( new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), e.getKey()), v).toString(), false)) .collect(Collectors.toList()))).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), plan.getPolicy(), new HashMap<>(), VERSION, - filePathsPerPartition, new ArrayList<>()); + return new HoodieCleanerPlan(plan.getEarliestInstantToRetain(), plan.getLastCompletedCommitTimestamp(), + plan.getPolicy(), new HashMap<>(), VERSION, filePathsPerPartition, new ArrayList<>()); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java index df4e9ac402c6d..513c4fa29ed25 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java @@ -61,6 +61,7 @@ public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime, int totalDeleted = 0; String earliestCommitToRetain = null; + String lastCompletedCommitTimestamp = ""; for (HoodieCleanStat stat : cleanStats) { HoodieCleanPartitionMetadata metadata = new HoodieCleanPartitionMetadata(stat.getPartitionPath(), stat.getPolicy().name(), @@ -77,11 +78,12 @@ public static HoodieCleanMetadata convertCleanMetadata(String startCleanTime, if (earliestCommitToRetain == null) { // This will be the same for all partitions earliestCommitToRetain = stat.getEarliestCommitToRetain(); + lastCompletedCommitTimestamp = stat.getLastCompletedCommitTimestamp(); } } - return new HoodieCleanMetadata(startCleanTime, durationInMs.orElseGet(() -> -1L), totalDeleted, - earliestCommitToRetain, partitionMetadataMap, CLEAN_METADATA_VERSION_2, partitionBootstrapMetadataMap); + return new HoodieCleanMetadata(startCleanTime, durationInMs.orElseGet(() -> -1L), totalDeleted, earliestCommitToRetain, + lastCompletedCommitTimestamp, partitionMetadataMap, CLEAN_METADATA_VERSION_2, partitionBootstrapMetadataMap); } /** diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 22ceb5bfef373..380c4c5212553 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -140,7 +140,7 @@ public void testGetPartitions() throws IOException { } @Test - public void testGetPartitionsUnpartitioned() throws IOException { + public void testGetPartitionsUnPartitioned() throws IOException { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); assertTrue(activeCommitTimeline.empty()); @@ -217,7 +217,7 @@ public void testGetExtraMetadata() throws Exception { verifyExtraMetadataLatestValue(extraMetadataKey, extraMetadataValue1, false); assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, "unknownKey").isPresent()); - // verify adding clustering commit doesnt change behavior of getExtraMetadataFromLatest + // verify adding clustering commit doesn't change behavior of getExtraMetadataFromLatest String ts2 = "2"; HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, ts2); activeTimeline.createNewInstant(instant2); @@ -338,6 +338,7 @@ private Option getCleanMetadata(String partition, String time) throws IO .setTotalFilesDeleted(1) .setStartCleanTime(time) .setEarliestCommitToRetain(time) + .setLastCompletedCommitTimestamp("") .setPartitionMetadata(partitionToFilesCleaned).build(); return TimelineMetadataUtils.serializeCleanMetadata(cleanMetadata); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java index 1c59558c94ce7..2f284c5befd1e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestIncrementalFSViewSync.java @@ -536,7 +536,7 @@ private void performClean(String instant, List files, String cleanInstan Map> partititonToFiles = deleteFiles(files); List cleanStats = partititonToFiles.entrySet().stream().map(e -> new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, e.getKey(), e.getValue(), e.getValue(), - new ArrayList<>(), Integer.toString(Integer.parseInt(instant) + 1))).collect(Collectors.toList()); + new ArrayList<>(), Integer.toString(Integer.parseInt(instant) + 1), "")).collect(Collectors.toList()); HoodieInstant cleanInflightInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, cleanInstant); metaClient.getActiveTimeline().createNewInstant(cleanInflightInstant); @@ -860,7 +860,7 @@ private List addReplaceInstant(HoodieTableMetaClient metaClient, String List> writeStats, Map> partitionToReplaceFileIds) throws IOException { // created requested - HoodieInstant newRequestedInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, instant); + HoodieInstant newRequestedInstant = new HoodieInstant(State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, instant); HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() .setOperationType(WriteOperationType.UNKNOWN.name()).build(); metaClient.getActiveTimeline().saveToPendingReplaceCommit(newRequestedInstant, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index f631ec94b0e4b..8be78a3a96927 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -322,7 +322,9 @@ public static void createBaseFile(String basePath, String partitionPath, String if (Files.notExists(baseFilePath)) { Files.createFile(baseFilePath); } - new RandomAccessFile(baseFilePath.toFile(), "rw").setLength(length); + RandomAccessFile raf = new RandomAccessFile(baseFilePath.toFile(), "rw"); + raf.setLength(length); + raf.close(); Files.setLastModifiedTime(baseFilePath, FileTime.fromMillis(lastModificationTimeMilli)); } @@ -344,7 +346,9 @@ public static void createLogFile(String basePath, String partitionPath, String i if (Files.notExists(logFilePath)) { Files.createFile(logFilePath); } - new RandomAccessFile(logFilePath.toFile(), "rw").setLength(length); + RandomAccessFile raf = new RandomAccessFile(logFilePath.toFile(), "rw"); + raf.setLength(length); + raf.close(); } public static String createMarkerFile(String basePath, String partitionPath, String instantTime, String fileId, IOType ioType) @@ -390,13 +394,13 @@ public static void deleteRollbackCommit(String basePath, String instantTime) thr removeMetaFile(basePath, instantTime, HoodieTimeline.ROLLBACK_EXTENSION); } - public static java.nio.file.Path renameFileToTemp(java.nio.file.Path sourcePath, String instantTime) throws IOException { - java.nio.file.Path dummyFilePath = sourcePath.getParent().resolve(instantTime + ".temp"); + public static Path renameFileToTemp(Path sourcePath, String instantTime) throws IOException { + Path dummyFilePath = sourcePath.getParent().resolve(instantTime + ".temp"); Files.move(sourcePath, dummyFilePath); return dummyFilePath; } - public static void renameTempToMetaFile(java.nio.file.Path tempFilePath, java.nio.file.Path destPath) throws IOException { + public static void renameTempToMetaFile(Path tempFilePath, Path destPath) throws IOException { Files.move(tempFilePath, destPath); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index 1351d1681212d..31de82b12cbd5 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -304,26 +304,27 @@ public HoodieTestTable addClean(String instantTime, HoodieCleanerPlan cleanerPla } public HoodieTestTable addClean(String instantTime) throws IOException { - HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(EMPTY_STRING, EMPTY_STRING, EMPTY_STRING), EMPTY_STRING, new HashMap<>(), - CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>()); + HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(EMPTY_STRING, EMPTY_STRING, EMPTY_STRING), + EMPTY_STRING, EMPTY_STRING, new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>()); HoodieCleanStat cleanStats = new HoodieCleanStat( HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, HoodieTestUtils.DEFAULT_PARTITION_PATHS[RANDOM.nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)], Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), - instantTime); + instantTime, + ""); HoodieCleanMetadata cleanMetadata = convertCleanMetadata(instantTime, Option.of(0L), Collections.singletonList(cleanStats)); return HoodieTestTable.of(metaClient).addClean(instantTime, cleanerPlan, cleanMetadata); } public Pair getHoodieCleanMetadata(String commitTime, HoodieTestTableState testTableState) { - HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(commitTime, CLEAN_ACTION, EMPTY_STRING), EMPTY_STRING, new HashMap<>(), - CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>()); + HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new HoodieActionInstant(commitTime, CLEAN_ACTION, EMPTY_STRING), + EMPTY_STRING, EMPTY_STRING, new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new HashMap<>(), new ArrayList<>()); List cleanStats = new ArrayList<>(); for (Map.Entry> entry : testTableState.getPartitionToFileIdMapForCleaner(commitTime).entrySet()) { cleanStats.add(new HoodieCleanStat(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS, - entry.getKey(), entry.getValue(), entry.getValue(), Collections.emptyList(), commitTime)); + entry.getKey(), entry.getValue(), entry.getValue(), Collections.emptyList(), commitTime, "")); } return Pair.of(cleanerPlan, convertCleanMetadata(commitTime, Option.of(0L), cleanStats)); } diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 909e4c51404ef..caf4ed21ccd91 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -502,14 +502,18 @@ public void handle(@NotNull Context context) throws Exception { if (refreshCheck) { long beginFinalCheck = System.currentTimeMillis(); if (isLocalViewBehind(context)) { - String errMsg = - "Last known instant from client was " - + context.queryParam(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, - HoodieTimeline.INVALID_INSTANT_TS) - + " but server has the following timeline " - + viewManager.getFileSystemView(context.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM)) - .getTimeline().getInstants().collect(Collectors.toList()); - throw new BadRequestResponse(errMsg); + String lastInstantTs = context.queryParam(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, + HoodieTimeline.INVALID_INSTANT_TS); + HoodieTimeline localTimeline = + viewManager.getFileSystemView(context.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM)).getTimeline(); + if (shouldThrowExceptionIfLocalViewBehind(localTimeline, lastInstantTs)) { + String errMsg = + "Last known instant from client was " + + lastInstantTs + + " but server has the following timeline " + + localTimeline.getInstants().collect(Collectors.toList()); + throw new BadRequestResponse(errMsg); + } } long endFinalCheck = System.currentTimeMillis(); finalCheckTimeTaken = endFinalCheck - beginFinalCheck; @@ -539,4 +543,19 @@ public void handle(@NotNull Context context) throws Exception { } } } + + /** + * Determine whether to throw an exception when local view of table's timeline is behind that of client's view. + */ + private boolean shouldThrowExceptionIfLocalViewBehind(HoodieTimeline localTimeline, String lastInstantTs) { + HoodieTimeline afterLastInstantTimeLine = localTimeline.findInstantsAfter(lastInstantTs).filterCompletedInstants(); + // When performing async clean, we may have one more .clean.completed after lastInstantTs. + // In this case, we do not need to throw an exception. + if (afterLastInstantTimeLine.countInstants() == 1 + && afterLastInstantTimeLine.filter(s -> s.getAction().equals(HoodieTimeline.CLEAN_ACTION)).countInstants() == 1) { + return false; + } else { + return true; + } + } } From 57cd61a9a02c62becbf0b763d322d0f70e68b588 Mon Sep 17 00:00:00 2001 From: dongkelun Date: Thu, 8 Sep 2022 11:25:48 +0800 Subject: [PATCH 2/2] Modify the logic of shouldThrowExceptionIfLocalViewBehind method --- .../hudi/timeline/service/RequestHandler.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index caf4ed21ccd91..f59a2ceba891c 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.table.marker.MarkerOperation; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.dto.BaseFileDTO; import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO; @@ -33,6 +34,7 @@ import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView; import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.timeline.service.handlers.BaseFileHandler; import org.apache.hudi.timeline.service.handlers.FileSliceHandler; @@ -502,14 +504,15 @@ public void handle(@NotNull Context context) throws Exception { if (refreshCheck) { long beginFinalCheck = System.currentTimeMillis(); if (isLocalViewBehind(context)) { - String lastInstantTs = context.queryParam(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, + String lastKnownInstantFromClient = context.queryParam(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, HoodieTimeline.INVALID_INSTANT_TS); + String timelineHashFromClient = context.queryParam(RemoteHoodieTableFileSystemView.TIMELINE_HASH, ""); HoodieTimeline localTimeline = viewManager.getFileSystemView(context.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM)).getTimeline(); - if (shouldThrowExceptionIfLocalViewBehind(localTimeline, lastInstantTs)) { + if (shouldThrowExceptionIfLocalViewBehind(localTimeline, timelineHashFromClient)) { String errMsg = "Last known instant from client was " - + lastInstantTs + + lastKnownInstantFromClient + " but server has the following timeline " + localTimeline.getInstants().collect(Collectors.toList()); throw new BadRequestResponse(errMsg); @@ -547,12 +550,12 @@ public void handle(@NotNull Context context) throws Exception { /** * Determine whether to throw an exception when local view of table's timeline is behind that of client's view. */ - private boolean shouldThrowExceptionIfLocalViewBehind(HoodieTimeline localTimeline, String lastInstantTs) { - HoodieTimeline afterLastInstantTimeLine = localTimeline.findInstantsAfter(lastInstantTs).filterCompletedInstants(); + private boolean shouldThrowExceptionIfLocalViewBehind(HoodieTimeline localTimeline, String timelineHashFromClient) { + Option lastInstant = localTimeline.lastInstant(); // When performing async clean, we may have one more .clean.completed after lastInstantTs. // In this case, we do not need to throw an exception. - if (afterLastInstantTimeLine.countInstants() == 1 - && afterLastInstantTimeLine.filter(s -> s.getAction().equals(HoodieTimeline.CLEAN_ACTION)).countInstants() == 1) { + if (lastInstant.isPresent() && lastInstant.get().getAction().equals(HoodieTimeline.CLEAN_ACTION) + && localTimeline.findInstantsBefore(lastInstant.get().getTimestamp()).getTimelineHash().equals(timelineHashFromClient)) { return false; } else { return true;