diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index d5ca307a00cd6..d636bcde3cf8f 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -267,15 +267,6 @@ public void notifyCheckpointComplete(long checkpointId) { ); } - @Override - public void notifyCheckpointAborted(long checkpointId) { - if (checkpointId == this.checkpointId && !WriteMetadataEvent.BOOTSTRAP_INSTANT.equals(this.instant)) { - executor.execute(() -> { - this.ckpMetadata.abortInstant(this.instant); - }, "abort instant %s", this.instant); - } - } - @Override public void resetToCheckpoint(long checkpointID, byte[] checkpointData) { // no operation diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 04b7f43547920..674cd3588aaf2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -243,13 +243,6 @@ protected String lastPendingInstant() { return this.ckpMetadata.lastPendingInstant(); } - /** - * Returns whether the instant is fresh new(not aborted). - */ - protected boolean freshInstant(String instant) { - return !this.ckpMetadata.isAborted(instant); - } - /** * Prepares the instant time to write with for next checkpoint. * @@ -286,6 +279,6 @@ protected String instantToWrite(boolean hasData) { * Returns whether the pending instant is invalid to write with. */ private boolean invalidInstant(String instant, boolean hasData) { - return instant.equals(this.currentInstant) && hasData && freshInstant(instant); + return instant.equals(this.currentInstant) && hasData; } }