diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java index 5c9930d606aaa..4cfbcee3b4e08 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java @@ -126,7 +126,7 @@ public void open() throws Exception { public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { super.prepareSnapshotPreBarrier(checkpointId); String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get()); - Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName); + Path path = generateCurrentMakerFilePath(instantMarkerFileName); // create marker file fs.create(path, true); LOG.info("Subtask [{}] at checkpoint [{}] created marker file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName); @@ -245,7 +245,7 @@ public void close() throws Exception { private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException { int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks(); FileStatus[] fileStatuses; - Path instantMarkerPath = new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME); + Path instantMarkerPath = generateCurrentMakerDirPath(); // waiting all subtask create marker file ready while (true) { Thread.sleep(500L); @@ -297,4 +297,13 @@ private void cleanMarkerDir(Path instantMarkerFolder) throws IOException { fs.delete(fileStatus.getPath(), true); } } + + private Path generateCurrentMakerDirPath() { + Path auxPath = new Path(cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME); + return new Path(auxPath, INSTANT_MARKER_FOLDER_NAME); + } + + private Path generateCurrentMakerFilePath(String instantMarkerFileName) { + return new Path(generateCurrentMakerDirPath(), instantMarkerFileName); + } }