diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java index 8d310223d308d..aa1e261c250b5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java @@ -126,8 +126,8 @@ public Pair>, String> fetchNextBatch(Option lastCkpt numInstantsPerFetch, beginInstant, missingCheckpointStrategy); if (queryTypeAndInstantEndpts.getValue().getKey().equals(queryTypeAndInstantEndpts.getValue().getValue())) { - LOG.warn("Already caught up. Begin Checkpoint was :" + queryTypeAndInstantEndpts.getKey()); - return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getKey()); + LOG.warn("Already caught up. Begin Checkpoint was :" + queryTypeAndInstantEndpts.getValue().getKey()); + return Pair.of(Option.empty(), queryTypeAndInstantEndpts.getValue().getKey()); } Dataset source = null; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java index 708d45477f4be..1f15cc3093e7a 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java @@ -48,6 +48,7 @@ import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; public class TestHoodieIncrSource extends HoodieClientTestHarness { @@ -77,7 +78,6 @@ public void testHoodieIncrSource() throws IOException { Pair> inserts4 = writeRecords(writeClient, true, null, "400"); Pair> inserts5 = writeRecords(writeClient, true, null, "500"); - // read everything upto latest readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT, Option.empty(), 500, inserts5.getKey()); @@ -89,6 +89,14 @@ public void testHoodieIncrSource() throws IOException { // read just the latest readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.empty(), 100, inserts5.getKey()); + + // ensure checkpoint does not move + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 0, inserts5.getKey()); + + Pair> inserts6 = writeRecords(writeClient, true, null, "600"); + + // insert new batch and ensure the checkpoint moves + readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST, Option.of(inserts5.getKey()), 100, inserts6.getKey()); } private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option checkpointToPull, int expectedCount, String expectedCheckpoint) { @@ -102,7 +110,11 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe // read everything until latest Pair>, String> batchCheckPoint = incrSource.fetchNextBatch(checkpointToPull, 500); Assertions.assertNotNull(batchCheckPoint.getValue()); - assertEquals(batchCheckPoint.getKey().get().count(), expectedCount); + if (expectedCount == 0) { + assertFalse(batchCheckPoint.getKey().isPresent()); + } else { + assertEquals(batchCheckPoint.getKey().get().count(), expectedCount); + } Assertions.assertEquals(batchCheckPoint.getRight(), expectedCheckpoint); }