diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index e3b0d827048e7..c87d5b2443c4e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -341,7 +341,7 @@ private static void initMetadataTable(HoodieFlinkWriteClient> writeClient) { private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) throws IOException { CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePath()); - ckpMetadata.bootstrap(metaClient); + ckpMetadata.bootstrap(); return ckpMetadata; } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index 4cdebf986fa33..6895b2a0c63da 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -94,7 +94,7 @@ public void close() { * *
This expects to be called by the driver. */ - public void bootstrap(HoodieTableMetaClient metaClient) throws IOException { + public void bootstrap() throws IOException { fs.delete(path, true); fs.mkdirs(path); } @@ -173,8 +173,8 @@ private void load() { @Nullable public String lastPendingInstant() { load(); - for (int i = this.messages.size() - 1; i >= 0; i--) { - CkpMessage ckpMsg = this.messages.get(i); + if (this.messages.size() > 0) { + CkpMessage ckpMsg = this.messages.get(this.messages.size() - 1); // consider 'aborted' as pending too to reuse the instant if (!ckpMsg.isComplete()) { return ckpMsg.getInstant(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java index a6fb493b9bdda..fe7ce3f9478d6 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/meta/TestCkpMetadata.java @@ -32,6 +32,7 @@ import java.io.File; import java.util.stream.IntStream; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -63,7 +64,7 @@ void testWriteAndReadMessage() { assertThat(metadata.lastPendingInstant(), is("2")); metadata.commitInstant("2"); - assertThat(metadata.lastPendingInstant(), is("1")); + assertThat(metadata.lastPendingInstant(), equalTo(null)); // test cleaning IntStream.range(3, 6).forEach(i -> metadata.startInstant(i + ""));