diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 674cd3588aaf2..f1787649b8734 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.common; +import org.apache.flink.runtime.state.CheckpointListener; import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -55,7 +56,7 @@ */ public abstract class AbstractStreamWriteFunction extends AbstractWriteFunction - implements CheckpointedFunction { + implements CheckpointedFunction, CheckpointListener { private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamWriteFunction.class); @@ -281,4 +282,14 @@ protected String instantToWrite(boolean hasData) { private boolean invalidInstant(String instant, boolean hasData) { return instant.equals(this.currentInstant) && hasData; } + + @Override + public void notifyCheckpointComplete(long l) throws Exception { + + } + + @Override + public void notifyCheckpointAborted(long checkpointId) throws Exception { + throw new RuntimeException("Checkpoint aborted for checkpointId [" + checkpointId + "]"); + } }