diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 558e75710d25a..9dd232d3605a0 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -125,6 +125,11 @@ public static OffsetRange[] computeOffsetRanges(HashMap numEvents) { + long offsetsToAdd = Math.min(eventsPerPartition, (numEvents - allocedEvents)); + toOffset = Math.min(toOffsetMax, toOffset + offsetsToAdd); + } ranges[i] = OffsetRange.create(range.topicAndPartition(), range.fromOffset(), toOffset); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java index 9825ae6f7d74e..9ac4bf438b635 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java @@ -183,5 +183,15 @@ public void testComputeOffsetRanges() { assertEquals(10, ranges[0].count()); assertEquals(100000, ranges[1].count()); assertEquals(10000, ranges[2].count()); + + // not all partitions consume same entries. + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {0, 0, 0, 0, 0}), + makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {100, 1000, 1000, 1000, 1000}), 1001); + assertEquals(1001, CheckpointUtils.totalNewMessages(ranges)); + assertEquals(100, ranges[0].count()); + assertEquals(226, ranges[1].count()); + assertEquals(226, ranges[2].count()); + assertEquals(226, ranges[3].count()); + assertEquals(223, ranges[4].count()); } }