From 1c54104310d872e2e37c7d0f391fc47b37f9648d Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Thu, 11 Nov 2021 12:46:53 -0500 Subject: [PATCH 1/2] Fixing parsing of metadadata table compaction timestamp --- .../functional/TestHoodieBackedMetadata.java | 48 ++++++++++++------- .../functional/TestHoodieMetadataBase.java | 7 ++- .../timeline/HoodieInstantTimeGenerator.java | 14 +++++- .../metadata/HoodieBackedTableMetadata.java | 3 +- .../timeline/TestHoodieActiveTimeline.java | 14 ++++++ 5 files changed, 64 insertions(+), 22 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 9ece523ff472b..e44ddd03b3b3b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -223,7 +223,7 @@ public void testOnlyValidPartitionsAdded(HoodieTableType tableType) throws Excep @ParameterizedTest @MethodSource("bootstrapAndTableOperationTestArgs") public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception { - init(tableType, true, enableFullScan); + init(tableType, true, enableFullScan, false); doWriteInsertAndUpsert(testTable); // trigger an upsert @@ -462,27 +462,43 @@ public void testSync(HoodieTableType tableType) throws Exception { validateMetadata(testTable, emptyList(), true); } + /** + * Fetches next commit time in seconds from current one. + * + * @param curCommitTime current commit time. + * @return the next valid commit time. + */ + private Long getNextCommitTime(long curCommitTime) { + if ((curCommitTime + 1) % 1000000000000L >= 60) { + return Long.parseLong(HoodieActiveTimeline.createNewInstantTime()); + } else { + return curCommitTime + 1; + } + } + @ParameterizedTest @EnumSource(HoodieTableType.class) public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType) throws Exception { - init(tableType); + init(tableType, true, true, true); + long baseCommitTime = Long.parseLong(HoodieActiveTimeline.createNewInstantTime()); for (int i = 1; i < 25; i += 7) { - String commitTime1 = ((i > 9) ? ("00000") : ("000000")) + i; - String commitTime2 = ((i > 9) ? ("00000") : ("000000")) + (i + 1); - String commitTime3 = ((i > 9) ? ("00000") : ("000000")) + (i + 2); - String commitTime4 = ((i > 9) ? ("00000") : ("000000")) + (i + 3); - String commitTime5 = ((i > 9) ? ("00000") : ("000000")) + (i + 4); - String commitTime6 = ((i > 9) ? ("00000") : ("000000")) + (i + 5); - String commitTime7 = ((i > 9) ? ("00000") : ("000000")) + (i + 6); - doWriteOperation(testTable, commitTime1, INSERT); - doWriteOperation(testTable, commitTime2); - doClean(testTable, commitTime3, Arrays.asList(commitTime1)); - doWriteOperation(testTable, commitTime4); + long commitTime1 = getNextCommitTime(baseCommitTime); + long commitTime2 = getNextCommitTime(commitTime1); + long commitTime3 = getNextCommitTime(commitTime2); + long commitTime4 = getNextCommitTime(commitTime3); + long commitTime5 = getNextCommitTime(commitTime4); + long commitTime6 = getNextCommitTime(commitTime5); + long commitTime7 = getNextCommitTime(commitTime6); + baseCommitTime = commitTime7; + doWriteOperation(testTable, Long.toString(commitTime1), INSERT); + doWriteOperation(testTable, Long.toString(commitTime2)); + doClean(testTable, Long.toString(commitTime3), Arrays.asList(Long.toString(commitTime1))); + doWriteOperation(testTable, Long.toString(commitTime4)); if (tableType == MERGE_ON_READ) { - doCompaction(testTable, commitTime5); + doCompaction(testTable, Long.toString(commitTime5)); } - doWriteOperation(testTable, commitTime6); - doRollback(testTable, commitTime6, commitTime7); + doWriteOperation(testTable, Long.toString(commitTime6)); + doRollback(testTable, Long.toString(commitTime6), Long.toString(commitTime7)); } validateMetadata(testTable, emptyList(), true); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java index cf261cc8994a1..5e4c4ba86632a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java @@ -75,10 +75,10 @@ public void init(HoodieTableType tableType) throws IOException { } public void init(HoodieTableType tableType, boolean enableMetadataTable) throws IOException { - init(tableType, enableMetadataTable, true); + init(tableType, enableMetadataTable, true, false); } - public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan) throws IOException { + public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan, boolean enableMetrics) throws IOException { this.tableType = tableType; initPath(); initSparkContexts("TestHoodieMetadata"); @@ -87,8 +87,7 @@ public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean initMetaClient(tableType); initTestDataGenerator(); metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); - writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, false, - enableFullScan).build(); + writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, enableMetrics, enableFullScan).build(); initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java index 817b39254ef05..9d4c17bb9ef5e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java @@ -18,6 +18,8 @@ package org.apache.hudi.common.table.timeline; +import java.text.ParseException; +import java.text.SimpleDateFormat; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -35,6 +37,10 @@ public class HoodieInstantTimeGenerator { private static final String INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmss"; // Formatter to generate Instant timestamps private static DateTimeFormatter INSTANT_TIME_FORMATTER = DateTimeFormatter.ofPattern(INSTANT_TIMESTAMP_FORMAT); + private static final String MILLIS_COMMIT_FORMAT = "yyyyMMddHHmmssSSS"; + private static final int MILLIS_INSTANT_ID_LENGTH = MILLIS_COMMIT_FORMAT.length(); + // Formatter to generate Instant timestamps for millsecs format + private static final SimpleDateFormat MILLIS_COMMIT_FORMATTER = new SimpleDateFormat(MILLIS_COMMIT_FORMAT); // The last Instant timestamp generated private static AtomicReference lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE)); private static final String ALL_ZERO_TIMESTAMP = "00000000000000"; @@ -56,7 +62,7 @@ public static String createNewInstantTime(long milliseconds) { }); } - public static Date parseInstantTime(String timestamp) { + public static Date parseInstantTime(String timestamp) throws ParseException { try { LocalDateTime dt = LocalDateTime.parse(timestamp, INSTANT_TIME_FORMATTER); return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant()); @@ -65,6 +71,12 @@ public static Date parseInstantTime(String timestamp) { if (timestamp.equals(ALL_ZERO_TIMESTAMP)) { return new Date(0); } + // compaction and cleaning in metadata has special format. handling it here. + if (timestamp.length() == MILLIS_INSTANT_ID_LENGTH) { + // we could either trim the last 3 characters and parse it with Secs granularity. but we will revisit this when we add support for + // ms level commit timestamp. + return MILLIS_COMMIT_FORMATTER.parse(timestamp); + } throw e; } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 1ee8a78e3e2ca..6cc5533f1c515 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -184,8 +184,9 @@ private List>>> readFrom HoodieRecord hoodieRecord = null; // Retrieve record from base file if (baseFileReader != null) { - HoodieTimer readTimer = new HoodieTimer().startTimer(); + HoodieTimer readTimer = new HoodieTimer(); for (String key : keys) { + readTimer.startTimer(); Option baseRecord = baseFileReader.getRecordByKey(key); if (baseRecord.isPresent()) { hoodieRecord = metadataTableConfig.populateMetaFields() diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 5f2d6928cbaaa..abbb2b14b2003 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.text.ParseException; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -472,6 +473,19 @@ public void testCreateNewInstantTime() throws Exception { } } + @Test + public void testMetadataCompactionInstantDateParsing() throws ParseException { + // Old second granularity instant ID + String secondGranularityInstant = "20210101120101"; + Date defaultSecsGranularityDate = HoodieActiveTimeline.parseInstantTime(secondGranularityInstant); + // New ms granularity instant ID + String compactionInstant = secondGranularityInstant + "001"; + Date msGranularityDate = HoodieActiveTimeline.parseInstantTime(compactionInstant); + assertEquals(1, msGranularityDate.getTime() - defaultSecsGranularityDate.getTime(), "Expected the ms part to be 999"); + assertTrue(HoodieTimeline.compareTimestamps(secondGranularityInstant, HoodieTimeline.LESSER_THAN, compactionInstant)); + assertTrue(HoodieTimeline.compareTimestamps(compactionInstant, HoodieTimeline.GREATER_THAN, secondGranularityInstant)); + } + /** * Returns an exhaustive list of all possible HoodieInstant. * @return list of HoodieInstant From 0baab999dfe3697d620f773902019e8b0c1f7019 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Fri, 12 Nov 2021 08:36:29 -0500 Subject: [PATCH 2/2] simplifying the fix --- .../functional/TestHoodieBackedMetadata.java | 2 +- .../timeline/HoodieInstantTimeGenerator.java | 16 +++++----------- .../table/timeline/TestHoodieActiveTimeline.java | 6 +++--- 3 files changed, 9 insertions(+), 15 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index e44ddd03b3b3b..49eddc24ef0d8 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -469,7 +469,7 @@ public void testSync(HoodieTableType tableType) throws Exception { * @return the next valid commit time. */ private Long getNextCommitTime(long curCommitTime) { - if ((curCommitTime + 1) % 1000000000000L >= 60) { + if ((curCommitTime + 1) % 1000000000000L >= 60) { // max seconds is 60 and hence return Long.parseLong(HoodieActiveTimeline.createNewInstantTime()); } else { return curCommitTime + 1; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java index 9d4c17bb9ef5e..f26f52c9435b7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstantTimeGenerator.java @@ -19,7 +19,6 @@ package org.apache.hudi.common.table.timeline; import java.text.ParseException; -import java.text.SimpleDateFormat; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -35,12 +34,9 @@ public class HoodieInstantTimeGenerator { // Format of the timestamp used for an Instant private static final String INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmss"; + private static final int INSTANT_TIMESTAMP_FORMAT_LENGTH = INSTANT_TIMESTAMP_FORMAT.length(); // Formatter to generate Instant timestamps private static DateTimeFormatter INSTANT_TIME_FORMATTER = DateTimeFormatter.ofPattern(INSTANT_TIMESTAMP_FORMAT); - private static final String MILLIS_COMMIT_FORMAT = "yyyyMMddHHmmssSSS"; - private static final int MILLIS_INSTANT_ID_LENGTH = MILLIS_COMMIT_FORMAT.length(); - // Formatter to generate Instant timestamps for millsecs format - private static final SimpleDateFormat MILLIS_COMMIT_FORMATTER = new SimpleDateFormat(MILLIS_COMMIT_FORMAT); // The last Instant timestamp generated private static AtomicReference lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE)); private static final String ALL_ZERO_TIMESTAMP = "00000000000000"; @@ -71,13 +67,11 @@ public static Date parseInstantTime(String timestamp) throws ParseException { if (timestamp.equals(ALL_ZERO_TIMESTAMP)) { return new Date(0); } - // compaction and cleaning in metadata has special format. handling it here. - if (timestamp.length() == MILLIS_INSTANT_ID_LENGTH) { - // we could either trim the last 3 characters and parse it with Secs granularity. but we will revisit this when we add support for - // ms level commit timestamp. - return MILLIS_COMMIT_FORMATTER.parse(timestamp); + // compaction and cleaning in metadata has special format. handling it by trimming extra chars and treating it with secs granularity + if (timestamp.length() > INSTANT_TIMESTAMP_FORMAT_LENGTH) { + LocalDateTime dt = LocalDateTime.parse(timestamp.substring(0, INSTANT_TIMESTAMP_FORMAT_LENGTH), INSTANT_TIME_FORMATTER); + return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant()); } - throw e; } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index abbb2b14b2003..97de617b545af 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -475,13 +475,13 @@ public void testCreateNewInstantTime() throws Exception { @Test public void testMetadataCompactionInstantDateParsing() throws ParseException { - // Old second granularity instant ID + // default second granularity instant ID String secondGranularityInstant = "20210101120101"; Date defaultSecsGranularityDate = HoodieActiveTimeline.parseInstantTime(secondGranularityInstant); - // New ms granularity instant ID + // metadata table compaction/cleaning : ms granularity instant ID String compactionInstant = secondGranularityInstant + "001"; Date msGranularityDate = HoodieActiveTimeline.parseInstantTime(compactionInstant); - assertEquals(1, msGranularityDate.getTime() - defaultSecsGranularityDate.getTime(), "Expected the ms part to be 999"); + assertEquals(0, msGranularityDate.getTime() - defaultSecsGranularityDate.getTime(), "Expected the ms part to be 0"); assertTrue(HoodieTimeline.compareTimestamps(secondGranularityInstant, HoodieTimeline.LESSER_THAN, compactionInstant)); assertTrue(HoodieTimeline.compareTimestamps(compactionInstant, HoodieTimeline.GREATER_THAN, secondGranularityInstant)); }