Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static long countNewRecords(HoodieTableMetaClient target, List<String> co

public static String getTimeDaysAgo(int numberOfDays) {
Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
return HoodieActiveTimeline.formatInstantTime(date);
return HoodieActiveTimeline.formatDate(date);
}

/**
Expand All @@ -61,8 +61,8 @@ public static String getTimeDaysAgo(int numberOfDays) {
* b) hours: -1, returns 20200202010000
*/
public static String addHours(String compactionCommitTime, int hours) throws ParseException {
Instant instant = HoodieActiveTimeline.parseInstantTime(compactionCommitTime).toInstant();
Instant instant = HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).toInstant();
ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
return HoodieActiveTimeline.formatInstantTime(Date.from(commitDateTime.plusHours(hours).toInstant()));
return HoodieActiveTimeline.formatDate(Date.from(commitDateTime.plusHours(hours).toInstant()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String

if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(instantTime).getTime(), durationInMs,
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime(), durationInMs,
metadata, actionType);
writeTimer = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy)
private Long parsedToSeconds(String time) {
long timestamp;
try {
timestamp = HoodieActiveTimeline.parseInstantTime(time).getTime() / 1000;
timestamp = HoodieActiveTimeline.parseDateFromInstantTime(time).getTime() / 1000;
} catch (ParseException e) {
throw new HoodieCompactionException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public void completeCompaction(
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(compactionCommitTime).getTime(),
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD<WriteSt
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(compactionCommitTime).getTime(),
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
Expand Down Expand Up @@ -405,7 +405,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
if (clusteringTimer != null) {
long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(clusteringCommitTime).getTime(),
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ public boolean rollbackCommit(String instantTime) {
BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS);

Long rollbackTime = HoodieActiveTimeline.parseInstantTime(instantTime).getTime();
Long rollbackTime = HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime();
Long currentTime = new Date().getTime();
Scan scan = new Scan();
scan.addFamily(SYSTEM_COLUMN_FAMILY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
protected HoodieTableMetaClient metaClient;

/**
* Parse the timestamp of an Instant and return a {@code SimpleDateFormat}.
* Parse the timestamp of an Instant and return a {@code Date}.
*/
public static Date parseInstantTime(String timestamp) throws ParseException {
return HoodieInstantTimeGenerator.parseInstantTime(timestamp);
public static Date parseDateFromInstantTime(String timestamp) throws ParseException {
return HoodieInstantTimeGenerator.parseDateFromInstantTime(timestamp);
}

/**
Expand All @@ -88,8 +88,8 @@ public static String formatInstantTime(Instant timestamp) {
/**
* Format the Date to a String representing the timestamp of a Hoodie Instant.
*/
public static String formatInstantTime(Date timestamp) {
return HoodieInstantTimeGenerator.formatInstantTime(timestamp);
public static String formatDate(Date timestamp) {
return HoodieInstantTimeGenerator.formatDate(timestamp);
}

/**
Expand All @@ -100,6 +100,7 @@ public static String createNewInstantTime() {
return HoodieInstantTimeGenerator.createNewInstantTime(0);
}


/**
* Returns next instant time that adds N milliseconds to current time.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DateTimeParseException;
import java.time.temporal.ChronoField;
import java.time.temporal.TemporalAccessor;
import java.util.Date;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -33,14 +35,27 @@
*/
public class HoodieInstantTimeGenerator {
// Format of the timestamp used for an Instant
private static final String INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmss";
private static final int INSTANT_TIMESTAMP_FORMAT_LENGTH = INSTANT_TIMESTAMP_FORMAT.length();
public static final String SECS_INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmss";
public static final int SECS_INSTANT_ID_LENGTH = SECS_INSTANT_TIMESTAMP_FORMAT.length();
public static final String MILLIS_INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmssSSS";
public static final int MILLIS_INSTANT_ID_LENGTH = MILLIS_INSTANT_TIMESTAMP_FORMAT.length();
public static final int MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH = MILLIS_INSTANT_TIMESTAMP_FORMAT.length();
// Formatter to generate Instant timestamps
private static DateTimeFormatter INSTANT_TIME_FORMATTER = DateTimeFormatter.ofPattern(INSTANT_TIMESTAMP_FORMAT);
// Unfortunately millisecond format is not parsable as is https://bugs.openjdk.java.net/browse/JDK-8031085. hence have to do appendValue()
private static DateTimeFormatter MILLIS_INSTANT_TIME_FORMATTER = new DateTimeFormatterBuilder().appendPattern(SECS_INSTANT_TIMESTAMP_FORMAT)
.appendValue(ChronoField.MILLI_OF_SECOND, 3).toFormatter();
private static final String MILLIS_GRANULARITY_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
private static DateTimeFormatter MILLIS_GRANULARITY_DATE_FORMATTER = DateTimeFormatter.ofPattern(MILLIS_GRANULARITY_DATE_FORMAT);

// The last Instant timestamp generated
private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));
private static final String ALL_ZERO_TIMESTAMP = "00000000000000";

// The default number of milliseconds that we add if they are not present
// We prefer the max timestamp as it mimics the current behavior with second granularity
// when performing comparisons such as LESS_THAN_OR_EQUAL_TO
private static final String DEFAULT_MILLIS_EXT = "999";

/**
* Returns next instant time that adds N milliseconds to the current time.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
Expand All @@ -52,36 +67,65 @@ public static String createNewInstantTime(long milliseconds) {
String newCommitTime;
do {
Date d = new Date(System.currentTimeMillis() + milliseconds);
newCommitTime = INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d));
newCommitTime = MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d));
} while (HoodieTimeline.compareTimestamps(newCommitTime, HoodieActiveTimeline.LESSER_THAN_OR_EQUALS, oldVal));
return newCommitTime;
});
}

public static Date parseInstantTime(String timestamp) throws ParseException {
public static Date parseDateFromInstantTime(String timestamp) throws ParseException {
try {
LocalDateTime dt = LocalDateTime.parse(timestamp, INSTANT_TIME_FORMATTER);
// Enables backwards compatibility with non-millisecond granularity instants
String timestampInMillis = timestamp;
if (isSecondGranularity(timestamp)) {
// Add milliseconds to the instant in order to parse successfully
timestampInMillis = timestamp + DEFAULT_MILLIS_EXT;
} else if (timestamp.length() > MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH) {
// compaction and cleaning in metadata has special format. handling it by trimming extra chars and treating it with ms granularity
timestampInMillis = timestamp.substring(0, MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH);
}

LocalDateTime dt = LocalDateTime.parse(timestampInMillis, MILLIS_INSTANT_TIME_FORMATTER);
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
} catch (DateTimeParseException e) {
// Special handling for all zero timestamp which is not parsable by DateTimeFormatter
if (timestamp.equals(ALL_ZERO_TIMESTAMP)) {
return new Date(0);
}
// compaction and cleaning in metadata has special format. handling it by trimming extra chars and treating it with secs granularity
if (timestamp.length() > INSTANT_TIMESTAMP_FORMAT_LENGTH) {
LocalDateTime dt = LocalDateTime.parse(timestamp.substring(0, INSTANT_TIMESTAMP_FORMAT_LENGTH), INSTANT_TIME_FORMATTER);
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
}
throw e;
}
}

private static boolean isSecondGranularity(String instant) {
return instant.length() == SECS_INSTANT_ID_LENGTH;
}

public static String formatInstantTime(Instant timestamp) {
return INSTANT_TIME_FORMATTER.format(timestamp);
return MILLIS_INSTANT_TIME_FORMATTER.format(timestamp);
}

public static String formatInstantTime(Date timestamp) {
return INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(timestamp));
public static String formatDate(Date timestamp) {
return getInstantFromTemporalAccessor(convertDateToTemporalAccessor(timestamp));
}

public static String getInstantFromTemporalAccessor(TemporalAccessor temporalAccessor) {
return MILLIS_INSTANT_TIME_FORMATTER.format(temporalAccessor);
}

/**
* Creates an instant string given a valid date-time string.
* @param dateString A date-time string in the format yyyy-MM-dd HH:mm:ss[:SSS]
* @return A timeline instant
* @throws ParseException If we cannot parse the date string
*/
public static String getInstantForDateString(String dateString) {
try {
return getInstantFromTemporalAccessor(LocalDateTime.parse(dateString, MILLIS_GRANULARITY_DATE_FORMATTER));
} catch (Exception e) {
// Attempt to add the milliseconds in order to complete parsing
return getInstantFromTemporalAccessor(LocalDateTime.parse(
String.format("%s:%s", dateString, DEFAULT_MILLIS_EXT), MILLIS_GRANULARITY_DATE_FORMATTER));
}
}

private static TemporalAccessor convertDateToTemporalAccessor(Date d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ public void setUp() throws IOException {

@Test
public void testMakeDataFileName() {
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
String instantTime = HoodieActiveTimeline.formatDate(new Date());
String fileName = UUID.randomUUID().toString();
assertEquals(FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName), fileName + "_" + TEST_WRITE_TOKEN + "_" + instantTime + BASE_FILE_EXTENSION);
}

@Test
public void testMaskFileName() {
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
String instantTime = HoodieActiveTimeline.formatDate(new Date());
int taskPartitionId = 2;
assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" + taskPartitionId + "_" + instantTime + BASE_FILE_EXTENSION);
}
Expand Down Expand Up @@ -154,7 +154,7 @@ public void testProcessFiles() throws Exception {

@Test
public void testGetCommitTime() {
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
String instantTime = HoodieActiveTimeline.formatDate(new Date());
String fileName = UUID.randomUUID().toString();
String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName);
assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
Expand All @@ -165,7 +165,7 @@ public void testGetCommitTime() {

@Test
public void testGetFileNameWithoutMeta() {
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
String instantTime = HoodieActiveTimeline.formatDate(new Date());
String fileName = UUID.randomUUID().toString();
String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName);
assertEquals(fileName, FSUtils.getFileId(fullFileName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class TestHoodieWriteStat {

@Test
public void testSetPaths() {
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date());
String instantTime = HoodieActiveTimeline.formatDate(new Date());
String basePathString = "/data/tables/some-hoodie-table";
String partitionPathString = "2017/12/31";
String fileName = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ public void testCreateNewInstantTime() throws Exception {
}

// All zero timestamp can be parsed
HoodieActiveTimeline.parseInstantTime("00000000000000");
HoodieActiveTimeline.parseDateFromInstantTime("00000000000000");

// Multiple thread test
final int numChecks = 100000;
Expand All @@ -455,9 +455,9 @@ public void testCreateNewInstantTime() throws Exception {
for (int idx = 0; idx < numThreads; ++idx) {
futures.add(executorService.submit(() -> {
Date date = new Date(System.currentTimeMillis() + (int)(Math.random() * numThreads) * milliSecondsInYear);
final String expectedFormat = HoodieActiveTimeline.formatInstantTime(date);
final String expectedFormat = HoodieActiveTimeline.formatDate(date);
for (int tidx = 0; tidx < numChecks; ++tidx) {
final String curFormat = HoodieActiveTimeline.formatInstantTime(date);
final String curFormat = HoodieActiveTimeline.formatDate(date);
if (!curFormat.equals(expectedFormat)) {
throw new HoodieException("Format error: expected=" + expectedFormat + ", curFormat=" + curFormat);
}
Expand All @@ -476,16 +476,37 @@ public void testCreateNewInstantTime() throws Exception {
@Test
public void testMetadataCompactionInstantDateParsing() throws ParseException {
// default second granularity instant ID
String secondGranularityInstant = "20210101120101";
Date defaultSecsGranularityDate = HoodieActiveTimeline.parseInstantTime(secondGranularityInstant);
String secondGranularityInstant = "20210101120101123";
Date defaultSecsGranularityDate = HoodieActiveTimeline.parseDateFromInstantTime(secondGranularityInstant);
// metadata table compaction/cleaning : ms granularity instant ID
String compactionInstant = secondGranularityInstant + "001";
Date msGranularityDate = HoodieActiveTimeline.parseInstantTime(compactionInstant);
assertEquals(0, msGranularityDate.getTime() - defaultSecsGranularityDate.getTime(), "Expected the ms part to be 0");
Date defaultMsGranularityDate = HoodieActiveTimeline.parseDateFromInstantTime(compactionInstant);
assertEquals(0, defaultMsGranularityDate.getTime() - defaultSecsGranularityDate.getTime(), "Expected the ms part to be 0");
assertTrue(HoodieTimeline.compareTimestamps(secondGranularityInstant, HoodieTimeline.LESSER_THAN, compactionInstant));
assertTrue(HoodieTimeline.compareTimestamps(compactionInstant, HoodieTimeline.GREATER_THAN, secondGranularityInstant));
}

@Test
public void testMillisGranularityInstantDateParsing() throws ParseException {
// Old second granularity instant ID
String secondGranularityInstant = "20210101120101";
Date defaultMsGranularityDate = HoodieActiveTimeline.parseDateFromInstantTime(secondGranularityInstant);
// New ms granularity instant ID
String specificMsGranularityInstant = secondGranularityInstant + "009";
Date msGranularityDate = HoodieActiveTimeline.parseDateFromInstantTime(specificMsGranularityInstant);
assertEquals(999, defaultMsGranularityDate.getTime() % 1000, "Expected the ms part to be 999");
assertEquals(9, msGranularityDate.getTime() % 1000, "Expected the ms part to be 9");

// Ensure that any date math which expects second granularity still works
String laterDateInstant = "20210101120111"; // + 10 seconds from original instant
assertEquals(
10,
HoodieActiveTimeline.parseDateFromInstantTime(laterDateInstant).getTime() / 1000
- HoodieActiveTimeline.parseDateFromInstantTime(secondGranularityInstant).getTime() / 1000,
"Expected the difference between later instant and previous instant to be 10 seconds"
);
}

/**
* Returns an exhaustive list of all possible HoodieInstant.
* @return list of HoodieInstant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public static String makeNewCommitTime() {
}

public static String makeNewCommitTime(Instant dateTime) {
return HoodieActiveTimeline.formatInstantTime(Date.from(dateTime));
return HoodieActiveTimeline.formatDate(Date.from(dateTime));
}

public static List<String> makeIncrementalCommitTimes(int num) {
Expand Down
Loading