Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ private void sendCommitAckEvents() {
try {
return this.context.sendEvent(CommitAckEvent.getInstance(), taskID);
} catch (TaskNotRunningException e) {
throw new HoodieException("Error while sending commit ack event to task [" + taskID + "] error", e);
throw new HoodieException("Error while sending commit ack event to task [" + taskID + "]", e);
}
}).toArray(CompletableFuture<?>[]::new);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public static void rollbackCompaction(HoodieFlinkTable<?> table, Configuration c
.filterPendingCompactionTimeline()
.filter(instant ->
instant.getState() == HoodieInstant.State.INFLIGHT
&& StreamerUtil.instantTimeDiff(curInstantTime, instant.getTimestamp()) >= deltaSeconds);
&& StreamerUtil.instantTimeDiffSeconds(curInstantTime, instant.getTimestamp()) >= deltaSeconds);
inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
LOG.info("Rollback the pending compaction instant: " + inflightInstant);
table.rollback(table.getContext(), HoodieActiveTimeline.createNewInstantTime(), inflightInstant, true);
Expand Down
28 changes: 22 additions & 6 deletions hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodieStorageConfig;
Expand All @@ -52,6 +54,8 @@
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
Expand Down Expand Up @@ -265,16 +269,28 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf, Runti
* Return the median instant time between the given two instant time.
*/
public static String medianInstantTime(String highVal, String lowVal) {
long high = Long.parseLong(highVal);
long low = Long.parseLong(lowVal);
long median = low + (high - low) / 2;
return String.valueOf(median);
try {
long high = HoodieActiveTimeline.COMMIT_FORMATTER.parse(highVal).getTime();
long low = HoodieActiveTimeline.COMMIT_FORMATTER.parse(lowVal).getTime();
ValidationUtils.checkArgument(high > low,
"Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]");
long median = low + (high - low) / 2;
return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(median));
} catch (ParseException e) {
throw new HoodieException("Get median instant time with interval [" + lowVal + ", " + highVal + "] error", e);
}
}

/**
* Returns the time interval in seconds between the given instant time.
*/
public static long instantTimeDiff(String newInstantTime, String oldInstantTime) {
return Long.parseLong(newInstantTime) - Long.parseLong(oldInstantTime);
public static long instantTimeDiffSeconds(String newInstantTime, String oldInstantTime) {
try {
long newTimestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(newInstantTime).getTime();
long oldTimestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(oldInstantTime).getTime();
return (newTimestamp - oldTimestamp) / 1000;
} catch (ParseException e) {
throw new HoodieException("Get instant time diff with interval [" + oldInstantTime + ", " + newInstantTime + "] error", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,30 @@
import org.apache.hudi.util.StreamerUtil;

import org.apache.flink.configuration.Configuration;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.io.IOException;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Test cases for {@link StreamerUtil}.
*/
public class TestStreamerUtil {

@TempDir
File tempFile;

@Test
public void testInitTableIfNotExists() throws IOException {
void testInitTableIfNotExists() throws IOException {
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());

// Test for partitioned table.
Expand Down Expand Up @@ -70,5 +75,25 @@ public void testInitTableIfNotExists() throws IOException {
.build();
assertFalse(metaClient2.getTableConfig().getPartitionColumns().isPresent());
}

@Test
void testMedianInstantTime() {
String higher = "20210705125921";
String lower = "20210705125806";
String median1 = StreamerUtil.medianInstantTime(higher, lower);
assertThat(median1, is("20210705125843"));
// test symmetry
assertThrows(IllegalArgumentException.class,
() -> StreamerUtil.medianInstantTime(lower, higher),
"The first argument should have newer instant time");
}

@Test
void testInstantTimeDiff() {
String higher = "20210705125921";
String lower = "20210705125806";
long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower);
assertThat(diff, is(75L));
}
}