diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java index 5f08f0097a451..a95cc53df329c 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/CommitUtil.java @@ -51,7 +51,7 @@ public static long countNewRecords(HoodieTableMetaClient target, List co public static String getTimeDaysAgo(int numberOfDays) { Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant()); - return HoodieActiveTimeline.formatInstantTime(date); + return HoodieActiveTimeline.formatDate(date); } /** @@ -61,8 +61,8 @@ public static String getTimeDaysAgo(int numberOfDays) { * b) hours: -1, returns 20200202010000 */ public static String addHours(String compactionCommitTime, int hours) throws ParseException { - Instant instant = HoodieActiveTimeline.parseInstantTime(compactionCommitTime).toInstant(); + Instant instant = HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).toInstant(); ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault()); - return HoodieActiveTimeline.formatInstantTime(Date.from(commitDateTime.plusHours(hours).toInstant())); + return HoodieActiveTimeline.formatDate(Date.from(commitDateTime.plusHours(hours).toInstant())); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index c2089466fe14b..37bcc1f30af48 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -233,7 +233,7 @@ void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String if (writeTimer != null) { long durationInMs = metrics.getDurationInMs(writeTimer.stop()); - metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(instantTime).getTime(), durationInMs, + metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime(), durationInMs, metadata, actionType); writeTimer = null; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java index 123f790d36df4..87c5a7c310445 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/ScheduleCompactionActionExecutor.java @@ -184,7 +184,7 @@ private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) private Long parsedToSeconds(String time) { long timestamp; try { - timestamp = HoodieActiveTimeline.parseInstantTime(time).getTime() / 1000; + timestamp = HoodieActiveTimeline.parseDateFromInstantTime(time).getTime() / 1000; } catch (ParseException e) { throw new HoodieCompactionException(e.getMessage(), e); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 3d44a2432f362..cb244ff6bad6f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -371,7 +371,7 @@ public void completeCompaction( if (compactionTimer != null) { long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); try { - metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(compactionCommitTime).getTime(), + metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION); } catch (ParseException e) { throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 891800bb992d0..501ae9304f218 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -324,7 +324,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, JavaRDD lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE)); private static final String ALL_ZERO_TIMESTAMP = "00000000000000"; + // The default number of milliseconds that we add if they are not present + // We prefer the max timestamp as it mimics the current behavior with second granularity + // when performing comparisons such as LESS_THAN_OR_EQUAL_TO + private static final String DEFAULT_MILLIS_EXT = "999"; + /** * Returns next instant time that adds N milliseconds to the current time. * Ensures each instant time is atleast 1 second apart since we create instant times at second granularity @@ -52,36 +67,65 @@ public static String createNewInstantTime(long milliseconds) { String newCommitTime; do { Date d = new Date(System.currentTimeMillis() + milliseconds); - newCommitTime = INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d)); + newCommitTime = MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d)); } while (HoodieTimeline.compareTimestamps(newCommitTime, HoodieActiveTimeline.LESSER_THAN_OR_EQUALS, oldVal)); return newCommitTime; }); } - public static Date parseInstantTime(String timestamp) throws ParseException { + public static Date parseDateFromInstantTime(String timestamp) throws ParseException { try { - LocalDateTime dt = LocalDateTime.parse(timestamp, INSTANT_TIME_FORMATTER); + // Enables backwards compatibility with non-millisecond granularity instants + String timestampInMillis = timestamp; + if (isSecondGranularity(timestamp)) { + // Add milliseconds to the instant in order to parse successfully + timestampInMillis = timestamp + DEFAULT_MILLIS_EXT; + } else if (timestamp.length() > MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH) { + // compaction and cleaning in metadata has special format. handling it by trimming extra chars and treating it with ms granularity + timestampInMillis = timestamp.substring(0, MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH); + } + + LocalDateTime dt = LocalDateTime.parse(timestampInMillis, MILLIS_INSTANT_TIME_FORMATTER); return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant()); } catch (DateTimeParseException e) { // Special handling for all zero timestamp which is not parsable by DateTimeFormatter if (timestamp.equals(ALL_ZERO_TIMESTAMP)) { return new Date(0); } - // compaction and cleaning in metadata has special format. handling it by trimming extra chars and treating it with secs granularity - if (timestamp.length() > INSTANT_TIMESTAMP_FORMAT_LENGTH) { - LocalDateTime dt = LocalDateTime.parse(timestamp.substring(0, INSTANT_TIMESTAMP_FORMAT_LENGTH), INSTANT_TIME_FORMATTER); - return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant()); - } throw e; } } + private static boolean isSecondGranularity(String instant) { + return instant.length() == SECS_INSTANT_ID_LENGTH; + } + public static String formatInstantTime(Instant timestamp) { - return INSTANT_TIME_FORMATTER.format(timestamp); + return MILLIS_INSTANT_TIME_FORMATTER.format(timestamp); } - public static String formatInstantTime(Date timestamp) { - return INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(timestamp)); + public static String formatDate(Date timestamp) { + return getInstantFromTemporalAccessor(convertDateToTemporalAccessor(timestamp)); + } + + public static String getInstantFromTemporalAccessor(TemporalAccessor temporalAccessor) { + return MILLIS_INSTANT_TIME_FORMATTER.format(temporalAccessor); + } + + /** + * Creates an instant string given a valid date-time string. + * @param dateString A date-time string in the format yyyy-MM-dd HH:mm:ss[:SSS] + * @return A timeline instant + * @throws ParseException If we cannot parse the date string + */ + public static String getInstantForDateString(String dateString) { + try { + return getInstantFromTemporalAccessor(LocalDateTime.parse(dateString, MILLIS_GRANULARITY_DATE_FORMATTER)); + } catch (Exception e) { + // Attempt to add the milliseconds in order to complete parsing + return getInstantFromTemporalAccessor(LocalDateTime.parse( + String.format("%s:%s", dateString, DEFAULT_MILLIS_EXT), MILLIS_GRANULARITY_DATE_FORMATTER)); + } } private static TemporalAccessor convertDateToTemporalAccessor(Date d) { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java index 65c729e7aaed3..0a2c5b4ea3db8 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java @@ -79,14 +79,14 @@ public void setUp() throws IOException { @Test public void testMakeDataFileName() { - String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); + String instantTime = HoodieActiveTimeline.formatDate(new Date()); String fileName = UUID.randomUUID().toString(); assertEquals(FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName), fileName + "_" + TEST_WRITE_TOKEN + "_" + instantTime + BASE_FILE_EXTENSION); } @Test public void testMaskFileName() { - String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); + String instantTime = HoodieActiveTimeline.formatDate(new Date()); int taskPartitionId = 2; assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" + taskPartitionId + "_" + instantTime + BASE_FILE_EXTENSION); } @@ -154,7 +154,7 @@ public void testProcessFiles() throws Exception { @Test public void testGetCommitTime() { - String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); + String instantTime = HoodieActiveTimeline.formatDate(new Date()); String fileName = UUID.randomUUID().toString(); String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName); assertEquals(instantTime, FSUtils.getCommitTime(fullFileName)); @@ -165,7 +165,7 @@ public void testGetCommitTime() { @Test public void testGetFileNameWithoutMeta() { - String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); + String instantTime = HoodieActiveTimeline.formatDate(new Date()); String fileName = UUID.randomUUID().toString(); String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName); assertEquals(fileName, FSUtils.getFileId(fullFileName)); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java index 8fb9dddaa2e86..f8995ab4c07ac 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java @@ -36,7 +36,7 @@ public class TestHoodieWriteStat { @Test public void testSetPaths() { - String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); + String instantTime = HoodieActiveTimeline.formatDate(new Date()); String basePathString = "/data/tables/some-hoodie-table"; String partitionPathString = "2017/12/31"; String fileName = UUID.randomUUID().toString(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 97de617b545af..e7ecde167c38f 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -444,7 +444,7 @@ public void testCreateNewInstantTime() throws Exception { } // All zero timestamp can be parsed - HoodieActiveTimeline.parseInstantTime("00000000000000"); + HoodieActiveTimeline.parseDateFromInstantTime("00000000000000"); // Multiple thread test final int numChecks = 100000; @@ -455,9 +455,9 @@ public void testCreateNewInstantTime() throws Exception { for (int idx = 0; idx < numThreads; ++idx) { futures.add(executorService.submit(() -> { Date date = new Date(System.currentTimeMillis() + (int)(Math.random() * numThreads) * milliSecondsInYear); - final String expectedFormat = HoodieActiveTimeline.formatInstantTime(date); + final String expectedFormat = HoodieActiveTimeline.formatDate(date); for (int tidx = 0; tidx < numChecks; ++tidx) { - final String curFormat = HoodieActiveTimeline.formatInstantTime(date); + final String curFormat = HoodieActiveTimeline.formatDate(date); if (!curFormat.equals(expectedFormat)) { throw new HoodieException("Format error: expected=" + expectedFormat + ", curFormat=" + curFormat); } @@ -476,16 +476,37 @@ public void testCreateNewInstantTime() throws Exception { @Test public void testMetadataCompactionInstantDateParsing() throws ParseException { // default second granularity instant ID - String secondGranularityInstant = "20210101120101"; - Date defaultSecsGranularityDate = HoodieActiveTimeline.parseInstantTime(secondGranularityInstant); + String secondGranularityInstant = "20210101120101123"; + Date defaultSecsGranularityDate = HoodieActiveTimeline.parseDateFromInstantTime(secondGranularityInstant); // metadata table compaction/cleaning : ms granularity instant ID String compactionInstant = secondGranularityInstant + "001"; - Date msGranularityDate = HoodieActiveTimeline.parseInstantTime(compactionInstant); - assertEquals(0, msGranularityDate.getTime() - defaultSecsGranularityDate.getTime(), "Expected the ms part to be 0"); + Date defaultMsGranularityDate = HoodieActiveTimeline.parseDateFromInstantTime(compactionInstant); + assertEquals(0, defaultMsGranularityDate.getTime() - defaultSecsGranularityDate.getTime(), "Expected the ms part to be 0"); assertTrue(HoodieTimeline.compareTimestamps(secondGranularityInstant, HoodieTimeline.LESSER_THAN, compactionInstant)); assertTrue(HoodieTimeline.compareTimestamps(compactionInstant, HoodieTimeline.GREATER_THAN, secondGranularityInstant)); } + @Test + public void testMillisGranularityInstantDateParsing() throws ParseException { + // Old second granularity instant ID + String secondGranularityInstant = "20210101120101"; + Date defaultMsGranularityDate = HoodieActiveTimeline.parseDateFromInstantTime(secondGranularityInstant); + // New ms granularity instant ID + String specificMsGranularityInstant = secondGranularityInstant + "009"; + Date msGranularityDate = HoodieActiveTimeline.parseDateFromInstantTime(specificMsGranularityInstant); + assertEquals(999, defaultMsGranularityDate.getTime() % 1000, "Expected the ms part to be 999"); + assertEquals(9, msGranularityDate.getTime() % 1000, "Expected the ms part to be 9"); + + // Ensure that any date math which expects second granularity still works + String laterDateInstant = "20210101120111"; // + 10 seconds from original instant + assertEquals( + 10, + HoodieActiveTimeline.parseDateFromInstantTime(laterDateInstant).getTime() / 1000 + - HoodieActiveTimeline.parseDateFromInstantTime(secondGranularityInstant).getTime() / 1000, + "Expected the difference between later instant and previous instant to be 10 seconds" + ); + } + /** * Returns an exhaustive list of all possible HoodieInstant. * @return list of HoodieInstant diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index b6ea32db25b36..3057fa065db86 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -148,7 +148,7 @@ public static String makeNewCommitTime() { } public static String makeNewCommitTime(Instant dateTime) { - return HoodieActiveTimeline.formatInstantTime(Date.from(dateTime)); + return HoodieActiveTimeline.formatDate(Date.from(dateTime)); } public static List makeIncrementalCommitTimes(int num) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 4b9a516106426..8954a8b2db538 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -403,12 +403,12 @@ public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throw */ public static Option medianInstantTime(String highVal, String lowVal) { try { - long high = HoodieActiveTimeline.parseInstantTime(highVal).getTime(); - long low = HoodieActiveTimeline.parseInstantTime(lowVal).getTime(); + long high = HoodieActiveTimeline.parseDateFromInstantTime(highVal).getTime(); + long low = HoodieActiveTimeline.parseDateFromInstantTime(lowVal).getTime(); ValidationUtils.checkArgument(high > low, "Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]"); long median = low + (high - low) / 2; - final String instantTime = HoodieActiveTimeline.formatInstantTime(new Date(median)); + final String instantTime = HoodieActiveTimeline.formatDate(new Date(median)); if (HoodieTimeline.compareTimestamps(lowVal, HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime) || HoodieTimeline.compareTimestamps(highVal, HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime)) { return Option.empty(); @@ -424,8 +424,8 @@ public static Option medianInstantTime(String highVal, String lowVal) { */ public static long instantTimeDiffSeconds(String newInstantTime, String oldInstantTime) { try { - long newTimestamp = HoodieActiveTimeline.parseInstantTime(newInstantTime).getTime(); - long oldTimestamp = HoodieActiveTimeline.parseInstantTime(oldInstantTime).getTime(); + long newTimestamp = HoodieActiveTimeline.parseDateFromInstantTime(newInstantTime).getTime(); + long oldTimestamp = HoodieActiveTimeline.parseDateFromInstantTime(oldInstantTime).getTime(); return (newTimestamp - oldTimestamp) / 1000; } catch (ParseException e) { throw new HoodieException("Get instant time diff with interval [" + oldInstantTime + ", " + newInstantTime + "] error", e); diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index b9e2b916da3c8..e00fbfac50ce3 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -80,8 +80,9 @@ void testInitTableIfNotExists() throws IOException { void testMedianInstantTime() { String higher = "20210705125921"; String lower = "20210705125806"; + String expectedMedianInstant = "20210705125844499"; String median1 = StreamerUtil.medianInstantTime(higher, lower).get(); - assertThat(median1, is("20210705125843")); + assertThat(median1, is(expectedMedianInstant)); // test symmetry assertThrows(IllegalArgumentException.class, () -> StreamerUtil.medianInstantTime(lower, higher), diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala index c8ac6e318c8dd..aa4ea91daf24c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hudi import scala.collection.JavaConverters._ import java.net.URI import java.util.{Date, Locale, Properties} - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -31,8 +30,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.table.timeline.HoodieActiveTimeline - +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator} import org.apache.spark.SPARK_VERSION import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -50,7 +48,6 @@ import java.text.SimpleDateFormat import scala.collection.immutable.Map object HoodieSqlUtils extends SparkAdapterSupport { - private val defaultDateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") private val defaultDateFormat = new SimpleDateFormat("yyyy-MM-dd") def isHoodieTable(table: CatalogTable): Boolean = { @@ -293,13 +290,15 @@ object HoodieSqlUtils extends SparkAdapterSupport { * 3、yyyyMMddHHmmss */ def formatQueryInstant(queryInstant: String): String = { - if (queryInstant.length == 19) { // for yyyy-MM-dd HH:mm:ss - HoodieActiveTimeline.formatInstantTime(defaultDateTimeFormat.parse(queryInstant)) - } else if (queryInstant.length == 14) { // for yyyyMMddHHmmss - HoodieActiveTimeline.parseInstantTime(queryInstant) // validate the format + val instantLength = queryInstant.length + if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd HH:mm:ss[.SSS] + HoodieInstantTimeGenerator.getInstantForDateString(queryInstant) + } else if (instantLength == HoodieInstantTimeGenerator.SECS_INSTANT_TIMESTAMP_FORMAT + || instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for yyyyMMddHHmmss[SSS] + HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) // validate the format queryInstant - } else if (queryInstant.length == 10) { // for yyyy-MM-dd - HoodieActiveTimeline.formatInstantTime(defaultDateFormat.parse(queryInstant)) + } else if (instantLength == 10) { // for yyyy-MM-dd + HoodieActiveTimeline.formatDate(defaultDateFormat.parse(queryInstant)) } else { throw new IllegalArgumentException(s"Unsupported query instant time format: $queryInstant," + s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss' or 'yyyy-MM-dd' or 'yyyyMMddHHmmss'") diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala index ccb3191e187e3..ffe9b64984027 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/streaming/HoodieStreamSource.scala @@ -179,10 +179,10 @@ class HoodieStreamSource( startOffset match { case INIT_OFFSET => startOffset.commitTime case HoodieSourceOffset(commitTime) => - val time = HoodieActiveTimeline.parseInstantTime(commitTime).getTime + val time = HoodieActiveTimeline.parseDateFromInstantTime(commitTime).getTime // As we consume the data between (start, end], start is not included, // so we +1s to the start commit time here. - HoodieActiveTimeline.formatInstantTime(new Date(time + 1000)) + HoodieActiveTimeline.formatDate(new Date(time + 1000)) case _=> throw new IllegalStateException("UnKnow offset type.") } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala index c4af71768b167..55f90f0ddef45 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala @@ -218,13 +218,13 @@ class TestTimeTravelQuery extends HoodieClientTestBase { } private def defaultDateTimeFormat(queryInstant: String): String = { - val date = HoodieActiveTimeline.parseInstantTime(queryInstant) - val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") + val date = HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) + val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS") format.format(date) } private def defaultDateFormat(queryInstant: String): String = { - val date = HoodieActiveTimeline.parseInstantTime(queryInstant) + val date = HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) val format = new SimpleDateFormat("yyyy-MM-dd") format.format(date) } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java index 3ff3a99d98eae..fd0a36c7d296d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieClusteringJob.java @@ -241,7 +241,7 @@ private int doScheduleAndCluster(JavaSparkContext jsc) throws Exception { HoodieTimeline inflightHoodieTimeline = table.getActiveTimeline().filterPendingReplaceTimeline().filterInflights(); if (!inflightHoodieTimeline.empty()) { HoodieInstant inflightClusteringInstant = inflightHoodieTimeline.lastInstant().get(); - Date clusteringStartTime = HoodieActiveTimeline.parseInstantTime(inflightClusteringInstant.getTimestamp()); + Date clusteringStartTime = HoodieActiveTimeline.parseDateFromInstantTime(inflightClusteringInstant.getTimestamp()); if (clusteringStartTime.getTime() + cfg.maxProcessingTimeMs < System.currentTimeMillis()) { // if there has failed clustering, then we will use the failed clustering instant-time to trigger next clustering action which will rollback and clustering. LOG.info("Found failed clustering instant at : " + inflightClusteringInstant + "; Will rollback the failed clustering and re-trigger again."); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java index 28ba17efa9f46..3ac490bf9163e 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHDFSParquetImporter.java @@ -230,7 +230,7 @@ public void testImportWithUpsert() throws IOException, ParseException { public List createInsertRecords(Path srcFolder) throws ParseException, IOException { Path srcFile = new Path(srcFolder.toString(), "file1.parquet"); - long startTime = HoodieActiveTimeline.parseInstantTime("20170203000000").getTime() / 1000; + long startTime = HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime() / 1000; List records = new ArrayList(); for (long recordNum = 0; recordNum < 96; recordNum++) { records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "0", "rider-" + recordNum, @@ -247,7 +247,7 @@ public List createInsertRecords(Path srcFolder) throws ParseExcep public List createUpsertRecords(Path srcFolder) throws ParseException, IOException { Path srcFile = new Path(srcFolder.toString(), "file1.parquet"); - long startTime = HoodieActiveTimeline.parseInstantTime("20170203000000").getTime() / 1000; + long startTime = HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime() / 1000; List records = new ArrayList(); // 10 for update for (long recordNum = 0; recordNum < 11; recordNum++) {