From 708a1e072cecfe4c78e7c3159a5d6e122d532024 Mon Sep 17 00:00:00 2001 From: hbg Date: Mon, 24 Apr 2023 23:45:10 +0800 Subject: [PATCH] [HUDI-6134] prevent clean run concurrently in flink. --- .../main/java/org/apache/hudi/sink/CleanFunction.java | 11 +++++++++-- .../hudi/sink/clustering/ClusteringCommitSink.java | 2 +- .../hudi/sink/compact/CompactionCommitSink.java | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 638fe9fdab286..b674df1771504 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -51,7 +51,7 @@ public class CleanFunction extends AbstractRichFunction private NonThrownExecutor executor; - private volatile boolean isCleaning; + protected volatile boolean isCleaning; public CleanFunction(Configuration conf) { this.conf = conf; @@ -64,7 +64,14 @@ public void open(Configuration parameters) throws Exception { this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); String instantTime = HoodieActiveTimeline.createNewInstantTime(); LOG.info(String.format("exec clean with instant time %s...", instantTime)); - executor.execute(() -> writeClient.clean(instantTime), "wait for cleaning finish"); + executor.execute(() -> { + this.isCleaning = true; + try { + this.writeClient.clean(instantTime); + } finally { + this.isCleaning = false; + } + }, "wait for cleaning finish"); } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java index 3f392de1527a6..7fde3e87aa419 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java @@ -179,7 +179,7 @@ private void doCommit(String instant, HoodieClusteringPlan clusteringPlan, List< TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant); // whether to clean up the input base parquet files used for clustering - if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { + if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) { LOG.info("Running inline clean"); this.writeClient.clean(); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index 0e9bc54f8fbb7..828aa3c42651f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -175,7 +175,7 @@ private void doCommit(String instant, Collection events) this.writeClient.commitCompaction(instant, metadata, Option.empty()); // Whether to clean up the old log file when compaction - if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { + if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) { this.writeClient.clean(); } }