From 34005770fd3a652b22a6cfeca87318053a18f53e Mon Sep 17 00:00:00 2001 From: zhangchaoming Date: Fri, 5 Feb 2021 17:08:57 +0800 Subject: [PATCH 1/5] Modify maker file path, which should start with the target base path. --- .../hudi/operator/InstantGenerateOperator.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java index 5c9930d606aaa..44e948ce06db3 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java @@ -126,7 +126,7 @@ public void open() throws Exception { public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { super.prepareSnapshotPreBarrier(checkpointId); String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get()); - Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName); + Path path = generateCurrentMakerFilePath(instantMarkerFileName); // create marker file fs.create(path, true); LOG.info("Subtask [{}] at checkpoint [{}] created marker file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName); @@ -245,7 +245,7 @@ public void close() throws Exception { private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException { int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks(); FileStatus[] fileStatuses; - Path instantMarkerPath = new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME); + Path instantMarkerPath = generateCurrentMakerPath(); // waiting all subtask create marker file ready while (true) { Thread.sleep(500L); @@ -297,4 +297,15 @@ private void cleanMarkerDir(Path instantMarkerFolder) throws IOException { fs.delete(fileStatus.getPath(), true); } } + private Path generateCurrentMakerPath() { + String baseDir = cfg.targetBasePath.endsWith("/") + ? cfg.targetBasePath.substring(0, cfg.targetBasePath.length() - 1) + : cfg.targetBasePath; + Path auxPath = new Path(baseDir, HoodieTableMetaClient.AUXILIARYFOLDER_NAME); + return new Path(auxPath, INSTANT_MARKER_FOLDER_NAME); + } + + private Path generateCurrentMakerFilePath(String instantMarkerFileName) { + return new Path(generateCurrentMakerPath(), instantMarkerFileName); + } } From c6d3f7197209d39feacf50f732b06e7e470ffb27 Mon Sep 17 00:00:00 2001 From: zhangchaoming Date: Fri, 5 Feb 2021 19:02:58 +0800 Subject: [PATCH 2/5] Fix checkstyle. --- .../java/org/apache/hudi/operator/InstantGenerateOperator.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java index 44e948ce06db3..31ed314c7dd66 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java @@ -297,6 +297,7 @@ private void cleanMarkerDir(Path instantMarkerFolder) throws IOException { fs.delete(fileStatus.getPath(), true); } } + private Path generateCurrentMakerPath() { String baseDir = cfg.targetBasePath.endsWith("/") ? cfg.targetBasePath.substring(0, cfg.targetBasePath.length() - 1) From 158f245a4539be11280da90da1e3e4a74e122139 Mon Sep 17 00:00:00 2001 From: zhangchaoming Date: Sun, 7 Feb 2021 14:06:54 +0800 Subject: [PATCH 3/5] Modify function name generateCurrentMakerPath() to generateCurrentMakerDirPath() and remove unnecessary code. --- .../apache/hudi/operator/InstantGenerateOperator.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java index 31ed314c7dd66..804dc84d3875f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java @@ -298,15 +298,12 @@ private void cleanMarkerDir(Path instantMarkerFolder) throws IOException { } } - private Path generateCurrentMakerPath() { - String baseDir = cfg.targetBasePath.endsWith("/") - ? cfg.targetBasePath.substring(0, cfg.targetBasePath.length() - 1) - : cfg.targetBasePath; - Path auxPath = new Path(baseDir, HoodieTableMetaClient.AUXILIARYFOLDER_NAME); + private Path generateCurrentMakerDirPath() { + Path auxPath = new Path(cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME); return new Path(auxPath, INSTANT_MARKER_FOLDER_NAME); } private Path generateCurrentMakerFilePath(String instantMarkerFileName) { - return new Path(generateCurrentMakerPath(), instantMarkerFileName); + return new Path(generateCurrentMakerDirPath(), instantMarkerFileName); } } From 4ac5d1958360a15550a061640446fd8d68072ab9 Mon Sep 17 00:00:00 2001 From: zhangchaoming Date: Sun, 7 Feb 2021 14:09:45 +0800 Subject: [PATCH 4/5] Modify function name generateCurrentMakerPath() to generateCurrentMakerDirPath() and remove unnecessary code. --- .../java/org/apache/hudi/operator/InstantGenerateOperator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java index 804dc84d3875f..4cfbcee3b4e08 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java @@ -245,7 +245,7 @@ public void close() throws Exception { private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException { int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks(); FileStatus[] fileStatuses; - Path instantMarkerPath = generateCurrentMakerPath(); + Path instantMarkerPath = generateCurrentMakerDirPath(); // waiting all subtask create marker file ready while (true) { Thread.sleep(500L); From 4339a60fe758fa6eec5747f160f1662f0d21e2a1 Mon Sep 17 00:00:00 2001 From: zhangchaoming Date: Mon, 8 Feb 2021 15:06:09 +0800 Subject: [PATCH 5/5] Empty commit to trigger the CI.