diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 1ca593ff53501..e75fad5fde21e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -81,8 +81,13 @@ public void notifyCheckpointComplete(long l) throws Exception { @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) { - this.writeClient.startAsyncCleaning(); - this.isCleaning = true; + try { + this.writeClient.startAsyncCleaning(); + this.isCleaning = true; + } catch (Throwable throwable) { + // catch the exception to not affect the normal checkpointing + LOG.warn("Error while start async cleaning", throwable); + } } }