diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index cdcd4e6265554..9c11a68d048ed 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -366,7 +366,7 @@ private void sendCommitAckEvents() { try { return this.context.sendEvent(CommitAckEvent.getInstance(), taskID); } catch (TaskNotRunningException e) { - throw new HoodieException("Error while sending commit ack event to task [" + taskID + "] error", e); + throw new HoodieException("Error while sending commit ack event to task [" + taskID + "]", e); } }).toArray(CompletableFuture[]::new); try { diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index e8927dc7f34f3..df856742ef835 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -99,7 +99,7 @@ public static void rollbackCompaction(HoodieFlinkTable table, Configuration c .filterPendingCompactionTimeline() .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT - && StreamerUtil.instantTimeDiff(curInstantTime, instant.getTimestamp()) >= deltaSeconds); + && StreamerUtil.instantTimeDiffSeconds(curInstantTime, instant.getTimestamp()) >= deltaSeconds); inflightCompactionTimeline.getInstants().forEach(inflightInstant -> { LOG.info("Rollback the pending compaction instant: " + inflightInstant); table.rollback(table.getContext(), HoodieActiveTimeline.createNewInstantTime(), inflightInstant, true); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 4b94d26bd8921..d69481dc08b9a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -27,6 +27,8 @@ import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.config.HoodieStorageConfig; @@ -52,6 +54,8 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.StringReader; +import java.text.ParseException; +import java.util.Date; import java.util.List; import java.util.Locale; import java.util.Properties; @@ -265,16 +269,28 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf, Runti * Return the median instant time between the given two instant time. */ public static String medianInstantTime(String highVal, String lowVal) { - long high = Long.parseLong(highVal); - long low = Long.parseLong(lowVal); - long median = low + (high - low) / 2; - return String.valueOf(median); + try { + long high = HoodieActiveTimeline.COMMIT_FORMATTER.parse(highVal).getTime(); + long low = HoodieActiveTimeline.COMMIT_FORMATTER.parse(lowVal).getTime(); + ValidationUtils.checkArgument(high > low, + "Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]"); + long median = low + (high - low) / 2; + return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(median)); + } catch (ParseException e) { + throw new HoodieException("Get median instant time with interval [" + lowVal + ", " + highVal + "] error", e); + } } /** * Returns the time interval in seconds between the given instant time. */ - public static long instantTimeDiff(String newInstantTime, String oldInstantTime) { - return Long.parseLong(newInstantTime) - Long.parseLong(oldInstantTime); + public static long instantTimeDiffSeconds(String newInstantTime, String oldInstantTime) { + try { + long newTimestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(newInstantTime).getTime(); + long oldTimestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(oldInstantTime).getTime(); + return (newTimestamp - oldTimestamp) / 1000; + } catch (ParseException e) { + throw new HoodieException("Get instant time diff with interval [" + oldInstantTime + ", " + newInstantTime + "] error", e); + } } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index a0072c28b9183..2f6037c3d54fa 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -24,25 +24,30 @@ import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; - import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +/** + * Test cases for {@link StreamerUtil}. + */ public class TestStreamerUtil { @TempDir File tempFile; @Test - public void testInitTableIfNotExists() throws IOException { + void testInitTableIfNotExists() throws IOException { Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); // Test for partitioned table. @@ -70,5 +75,25 @@ public void testInitTableIfNotExists() throws IOException { .build(); assertFalse(metaClient2.getTableConfig().getPartitionColumns().isPresent()); } + + @Test + void testMedianInstantTime() { + String higher = "20210705125921"; + String lower = "20210705125806"; + String median1 = StreamerUtil.medianInstantTime(higher, lower); + assertThat(median1, is("20210705125843")); + // test symmetry + assertThrows(IllegalArgumentException.class, + () -> StreamerUtil.medianInstantTime(lower, higher), + "The first argument should have newer instant time"); + } + + @Test + void testInstantTimeDiff() { + String higher = "20210705125921"; + String lower = "20210705125806"; + long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower); + assertThat(diff, is(75L)); + } }