diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 867621a66d8f4..ddbd24e3562d0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; @@ -414,7 +415,12 @@ public static Option medianInstantTime(String highVal, String lowVal) { ValidationUtils.checkArgument(high > low, "Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]"); long median = low + (high - low) / 2; - return low >= median ? Option.empty() : Option.of(HoodieActiveTimeline.formatInstantTime(new Date(median))); + final String instantTime = HoodieActiveTimeline.formatInstantTime(new Date(median)); + if (HoodieTimeline.compareTimestamps(lowVal, HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime) + || HoodieTimeline.compareTimestamps(highVal, HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime)) { + return Option.empty(); + } + return Option.of(instantTime); } catch (ParseException e) { throw new HoodieException("Get median instant time with interval [" + lowVal + ", " + highVal + "] error", e); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index c05e5b056344a..b9e2b916da3c8 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -86,6 +86,8 @@ void testMedianInstantTime() { assertThrows(IllegalArgumentException.class, () -> StreamerUtil.medianInstantTime(lower, higher), "The first argument should have newer instant time"); + // test very near instant time + assertFalse(StreamerUtil.medianInstantTime("20211116115634", "20211116115633").isPresent()); } @Test