diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index c20b263fa36da..6d0174069f205 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -61,7 +61,9 @@ public class CkpMetadata implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class); - protected static final int MAX_RETAIN_CKP_NUM = 3; + // 1 is actually enough for fetching the latest pending instant, + // keep 3 instants here for purpose of debugging. + private static final int MAX_RETAIN_CKP_NUM = 3; // the ckp metadata directory private static final String CKP_META = "ckp_meta"; @@ -106,15 +108,20 @@ public void startInstant(String instant) { } catch (IOException e) { throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant, e); } + // cache the instant + cache(instant); // cleaning - clean(instant); + clean(); } - private void clean(String newInstant) { + private void cache(String newInstant) { if (this.instantCache == null) { this.instantCache = new ArrayList<>(); } this.instantCache.add(newInstant); + } + + private void clean() { if (instantCache.size() > MAX_RETAIN_CKP_NUM) { final String instant = instantCache.get(0); boolean[] error = new boolean[1];