Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -307,17 +306,14 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig config, Configuration hadoopConf);

void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, String actionType) {
try {

if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime(), durationInMs,
metadata, actionType);
writeTimer = null;
}
} catch (ParseException e) {
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime
+ "Instant time is not of valid format", e);
if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop());
// instantTime could be a non-standard value, so use `parseDateFromInstantTimeSafely`
// e.g. INIT_INSTANT_TS, METADATA_BOOTSTRAP_INSTANT_TS and FULL_BOOTSTRAP_INSTANT_TS in HoodieTimeline
HoodieActiveTimeline.parseDateFromInstantTimeSafely(instantTime).ifPresent(parsedInstant ->
metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, actionType)
);
writeTimer = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
Expand All @@ -64,7 +63,6 @@
import org.apache.spark.api.java.JavaSparkContext;

import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -318,13 +316,9 @@ protected void completeCompaction(HoodieCommitMetadata metadata,
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ config.getBasePath() + " at time " + compactionCommitTime, e);
}
HoodieActiveTimeline.parseDateFromInstantTimeSafely(compactionCommitTime).ifPresent(parsedInstant ->
metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION)
);
}
LOG.info("Compacted successfully on commit " + compactionCommitTime);
}
Expand Down Expand Up @@ -402,13 +396,9 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
if (clusteringTimer != null) {
long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
try {
metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
} catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
+ config.getBasePath() + " at time " + clusteringCommitTime, e);
}
HoodieActiveTimeline.parseDateFromInstantTimeSafely(clusteringCommitTime).ifPresent(parsedInstant ->
metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION)
);
}
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,57 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION,
REQUESTED_INDEX_COMMIT_EXTENSION, INFLIGHT_INDEX_COMMIT_EXTENSION, INDEX_COMMIT_EXTENSION,
REQUESTED_SAVE_SCHEMA_ACTION_EXTENSION, INFLIGHT_SAVE_SCHEMA_ACTION_EXTENSION, SAVE_SCHEMA_ACTION_EXTENSION));

private static final Set<String> NOT_PARSABLE_TIMESTAMPS = new HashSet<String>(3) {{
add(HoodieTimeline.INIT_INSTANT_TS);
add(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
add(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
}};

private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
protected HoodieTableMetaClient metaClient;

/**
* Parse the timestamp of an Instant and return a {@code Date}.
* Throw ParseException if timestamp is not valid format as
* {@link org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator#SECS_INSTANT_TIMESTAMP_FORMAT}.
*
* @param timestamp a timestamp String which follow pattern as
* {@link org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator#SECS_INSTANT_TIMESTAMP_FORMAT}.
* @return Date of instant timestamp
*/
public static Date parseDateFromInstantTime(String timestamp) throws ParseException {
return HoodieInstantTimeGenerator.parseDateFromInstantTime(timestamp);
}

/**
* The same parsing method as above, but this method will mute ParseException.
* If the given timestamp is invalid, returns {@code Option.empty}.
* Or a corresponding Date value if these timestamp strings are provided
* {@link org.apache.hudi.common.table.timeline.HoodieTimeline#INIT_INSTANT_TS},
* {@link org.apache.hudi.common.table.timeline.HoodieTimeline#METADATA_BOOTSTRAP_INSTANT_TS},
* {@link org.apache.hudi.common.table.timeline.HoodieTimeline#FULL_BOOTSTRAP_INSTANT_TS}.
* This method is useful when parsing timestamp for metrics
*
* @param timestamp a timestamp String which follow pattern as
* {@link org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator#SECS_INSTANT_TIMESTAMP_FORMAT}.
* @return {@code Option<Date>} of instant timestamp, {@code Option.empty} if invalid timestamp
*/
public static Option<Date> parseDateFromInstantTimeSafely(String timestamp) {
Option<Date> parsedDate;
try {
parsedDate = Option.of(HoodieInstantTimeGenerator.parseDateFromInstantTime(timestamp));
} catch (ParseException e) {
if (NOT_PARSABLE_TIMESTAMPS.contains(timestamp)) {
parsedDate = Option.of(new Date(Integer.parseInt(timestamp)));
} else {
LOG.warn("Failed to parse timestamp " + timestamp + ": " + e.getMessage());
parsedDate = Option.empty();
}
}
return parsedDate;
}

/**
* Format the Date to a String representing the timestamp of a Hoodie Instant.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public class HoodieInstantTimeGenerator {

// The last Instant timestamp generated
private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));
private static final String ALL_ZERO_TIMESTAMP = "00000000000000";

// The default number of milliseconds that we add if they are not present
// We prefer the max timestamp as it mimics the current behavior with second granularity
Expand Down Expand Up @@ -96,11 +95,7 @@ public static Date parseDateFromInstantTime(String timestamp) throws ParseExcept
LocalDateTime dt = LocalDateTime.parse(timestampInMillis, MILLIS_INSTANT_TIME_FORMATTER);
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
} catch (DateTimeParseException e) {
// Special handling for all zero timestamp which is not parsable by DateTimeFormatter
if (timestamp.equals(ALL_ZERO_TIMESTAMP)) {
return new Date(0);
}
throw e;
throw new ParseException(e.getMessage(), e.getErrorIndex());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,6 @@ public void testCreateNewInstantTime() throws Exception {
lastInstantTime = newInstantTime;
}

// All zero timestamp can be parsed
HoodieActiveTimeline.parseDateFromInstantTime("00000000000000");

// Multiple thread test
final int numChecks = 100000;
final int numThreads = 100;
Expand Down Expand Up @@ -631,6 +628,26 @@ public void testMillisGranularityInstantDateParsing() throws ParseException {
);
}

@Test
public void testInvalidInstantDateParsing() throws ParseException {
// Test all invalid timestamp in HoodieTimeline, shouldn't throw any error and should return a correct value
assertEquals(Long.parseLong(HoodieTimeline.INIT_INSTANT_TS),
HoodieActiveTimeline.parseDateFromInstantTimeSafely(HoodieTimeline.INIT_INSTANT_TS).get().getTime());
assertEquals(Long.parseLong(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS),
HoodieActiveTimeline.parseDateFromInstantTimeSafely(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS).get().getTime());
assertEquals(Long.parseLong(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS),
HoodieActiveTimeline.parseDateFromInstantTimeSafely(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS).get().getTime());

// Test metadata table compaction instant date parsing with INIT_INSTANT_TS, should return Option.empty
assertEquals(Option.empty(),
HoodieActiveTimeline.parseDateFromInstantTimeSafely(HoodieTimeline.INIT_INSTANT_TS + "001"));

// Test a valid instant timestamp, should equal the same result as HoodieActiveTimeline.parseDateFromInstantTime
String testInstant = "20210101120101";
assertEquals(HoodieActiveTimeline.parseDateFromInstantTime(testInstant).getTime(),
HoodieActiveTimeline.parseDateFromInstantTimeSafely(testInstant).get().getTime());
}

/**
* Returns an exhaustive list of all possible HoodieInstant.
* @return list of HoodieInstant
Expand Down