diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 48dd21d28ea6a..e309dd07d4cd0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -104,7 +104,6 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.text.ParseException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -307,17 +306,14 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom protected abstract HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf); void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) { - try { - - if (writeTimer != null) { - long durationInMs = metrics.getDurationInMs(writeTimer.stop()); - metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime(), durationInMs, - metadata, actionType); - writeTimer = null; - } - } catch (ParseException e) { - throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime - + "Instant time is not of valid format", e); + if (writeTimer != null) { + long durationInMs = metrics.getDurationInMs(writeTimer.stop()); + // instantTime could be a non-standard value, so use `parseDateFromInstantTimeSafely` + // e.g. INIT_INSTANT_TS, METADATA_BOOTSTRAP_INSTANT_TS and FULL_BOOTSTRAP_INSTANT_TS in HoodieTimeline + HoodieActiveTimeline.parseDateFromInstantTimeSafely(instantTime).ifPresent(parsedInstant -> + metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, actionType) + ); + writeTimer = null; } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index bdf478a8f6e34..a142fd80d4bf8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -41,7 +41,6 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieClusteringException; -import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.SparkHoodieIndexFactory; import org.apache.hudi.metadata.HoodieTableMetadataWriter; @@ -64,7 +63,6 @@ import org.apache.spark.api.java.JavaSparkContext; import java.nio.charset.StandardCharsets; -import java.text.ParseException; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -318,13 +316,9 @@ protected void completeCompaction(HoodieCommitMetadata metadata, .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); if (compactionTimer != null) { long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); - try { - metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(), - durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION); - } catch (ParseException e) { - throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " - + config.getBasePath() + " at time " + compactionCommitTime, e); - } + HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant -> + metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION) + ); } LOG.info("Compacted successfully on commit " + compactionCommitTime); } @@ -402,13 +396,9 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); if (clusteringTimer != null) { long durationInMs = metrics.getDurationInMs(clusteringTimer.stop()); - try { - metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(), - durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); - } catch (ParseException e) { - throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " - + config.getBasePath() + " at time " + clusteringCommitTime, e); - } + HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant -> + metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION) + ); } LOG.info("Clustering successfully on commit " + clusteringCommitTime); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index b3dbe422b10ae..2b27d3ab5e568 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -74,16 +74,57 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION, REQUESTED_INDEX_COMMIT_EXTENSION, INFLIGHT_INDEX_COMMIT_EXTENSION, INDEX_COMMIT_EXTENSION, REQUESTED_SAVE_SCHEMA_ACTION_EXTENSION, INFLIGHT_SAVE_SCHEMA_ACTION_EXTENSION, SAVE_SCHEMA_ACTION_EXTENSION)); + + private static final Set NOT_PARSABLE_TIMESTAMPS = new HashSet(3) {{ + add(HoodieTimeline.INIT_INSTANT_TS); + add(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS); + add(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS); + }}; + private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class); protected HoodieTableMetaClient metaClient; /** * Parse the timestamp of an Instant and return a {@code Date}. + * Throw ParseException if timestamp is not valid format as + * {@link org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator#SECS_INSTANT_TIMESTAMP_FORMAT}. + * + * @param timestamp a timestamp String which follow pattern as + * {@link org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator#SECS_INSTANT_TIMESTAMP_FORMAT}. + * @return Date of instant timestamp */ public static Date parseDateFromInstantTime(String timestamp) throws ParseException { return HoodieInstantTimeGenerator.parseDateFromInstantTime(timestamp); } + /** + * The same parsing method as above, but this method will mute ParseException. + * If the given timestamp is invalid, returns {@code Option.empty}. + * Or a corresponding Date value if these timestamp strings are provided + * {@link org.apache.hudi.common.table.timeline.HoodieTimeline#INIT_INSTANT_TS}, + * {@link org.apache.hudi.common.table.timeline.HoodieTimeline#METADATA_BOOTSTRAP_INSTANT_TS}, + * {@link org.apache.hudi.common.table.timeline.HoodieTimeline#FULL_BOOTSTRAP_INSTANT_TS}. + * This method is useful when parsing timestamp for metrics + * + * @param timestamp a timestamp String which follow pattern as + * {@link org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator#SECS_INSTANT_TIMESTAMP_FORMAT}. + * @return {@code Option} of instant timestamp, {@code Option.empty} if invalid timestamp + */ + public static Option parseDateFromInstantTimeSafely(String timestamp) { + Option parsedDate; + try { + parsedDate = Option.of(HoodieInstantTimeGenerator.parseDateFromInstantTime(timestamp)); + } catch (ParseException e) { + if (NOT_PARSABLE_TIMESTAMPS.contains(timestamp)) { + parsedDate = Option.of(new Date(Integer.parseInt(timestamp))); + } else { + LOG.warn("Failed to parse timestamp " + timestamp + ": " + e.getMessage()); + parsedDate = Option.empty(); + } + } + return parsedDate; + } + /** * Format the Date to a String representing the timestamp of a Hoodie Instant. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java index 33b847b4c62c5..f2d2d7e29dcb1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java @@ -50,7 +50,6 @@ public class HoodieInstantTimeGenerator { // The last Instant timestamp generated private static AtomicReference lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE)); - private static final String ALL_ZERO_TIMESTAMP = "00000000000000"; // The default number of milliseconds that we add if they are not present // We prefer the max timestamp as it mimics the current behavior with second granularity @@ -96,11 +95,7 @@ public static Date parseDateFromInstantTime(String timestamp) throws ParseExcept LocalDateTime dt = LocalDateTime.parse(timestampInMillis, MILLIS_INSTANT_TIME_FORMATTER); return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant()); } catch (DateTimeParseException e) { - // Special handling for all zero timestamp which is not parsable by DateTimeFormatter - if (timestamp.equals(ALL_ZERO_TIMESTAMP)) { - return new Date(0); - } - throw e; + throw new ParseException(e.getMessage(), e.getErrorIndex()); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 628aeb8e804b8..1c8d5ece242da 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -567,9 +567,6 @@ public void testCreateNewInstantTime() throws Exception { lastInstantTime = newInstantTime; } - // All zero timestamp can be parsed - HoodieActiveTimeline.parseDateFromInstantTime("00000000000000"); - // Multiple thread test final int numChecks = 100000; final int numThreads = 100; @@ -631,6 +628,26 @@ public void testMillisGranularityInstantDateParsing() throws ParseException { ); } + @Test + public void testInvalidInstantDateParsing() throws ParseException { + // Test all invalid timestamp in HoodieTimeline, shouldn't throw any error and should return a correct value + assertEquals(Long.parseLong(HoodieTimeline.INIT_INSTANT_TS), + HoodieActiveTimeline.parseDateFromInstantTimeSafely(HoodieTimeline.INIT_INSTANT_TS).get().getTime()); + assertEquals(Long.parseLong(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS), + HoodieActiveTimeline.parseDateFromInstantTimeSafely(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS).get().getTime()); + assertEquals(Long.parseLong(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS), + HoodieActiveTimeline.parseDateFromInstantTimeSafely(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS).get().getTime()); + + // Test metadata table compaction instant date parsing with INIT_INSTANT_TS, should return Option.empty + assertEquals(Option.empty(), + HoodieActiveTimeline.parseDateFromInstantTimeSafely(HoodieTimeline.INIT_INSTANT_TS + "001")); + + // Test a valid instant timestamp, should equal the same result as HoodieActiveTimeline.parseDateFromInstantTime + String testInstant = "20210101120101"; + assertEquals(HoodieActiveTimeline.parseDateFromInstantTime(testInstant).getTime(), + HoodieActiveTimeline.parseDateFromInstantTimeSafely(testInstant).get().getTime()); + } + /** * Returns an exhaustive list of all possible HoodieInstant. * @return list of HoodieInstant