From bdc8888ac7c5442d3f79cd75fc02e13d606e4bd6 Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Tue, 14 Dec 2021 15:38:58 +0800 Subject: [PATCH] [HUDI-3015] Implement #reset and #sync for metadata filesystem view --- .../client/TestHoodieClientMultiWriter.java | 59 +++++++++---------- .../view/AbstractTableFileSystemView.java | 5 +- .../view/PriorityBasedFileSystemView.java | 4 +- .../view/RemoteHoodieTableFileSystemView.java | 3 +- .../FileSystemBackedTableMetadata.java | 5 ++ .../metadata/HoodieBackedTableMetadata.java | 6 ++ .../HoodieMetadataFileSystemView.java | 12 ++++ .../hudi/metadata/HoodieTableMetadata.java | 5 ++ .../view/TestPriorityBasedFileSystemView.java | 4 +- 9 files changed, 63 insertions(+), 40 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index 6dd9ff12b5310..60979c33521d1 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -100,38 +100,33 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E .build()).withAutoCommit(false).withProperties(properties).build(); // Create the first commit createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200); - try { - ExecutorService executors = Executors.newFixedThreadPool(2); - SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); - SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); - Future future1 = executors.submit(() -> { - String newCommitTime = "004"; - int numRecords = 100; - String commitTimeBetweenPrevAndNew = "002"; - try { - createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); - } catch (Exception e1) { - assertTrue(e1 instanceof HoodieWriteConflictException); - throw new RuntimeException(e1); - } - }); - Future future2 = executors.submit(() -> { - String newCommitTime = "005"; - int numRecords = 100; - String commitTimeBetweenPrevAndNew = "002"; - try { - createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); - } catch (Exception e2) { - assertTrue(e2 instanceof HoodieWriteConflictException); - throw new RuntimeException(e2); - } - }); - future1.get(); - future2.get(); - fail("Should not reach here, this means concurrent writes were handled incorrectly"); - } catch (Exception e) { - // Expected to fail due to overlapping commits - } + ExecutorService executors = Executors.newFixedThreadPool(2); + SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); + SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); + Future future1 = executors.submit(() -> { + String newCommitTime = "004"; + int numRecords = 100; + String commitTimeBetweenPrevAndNew = "002"; + try { + createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); + } catch (Exception e1) { + assertTrue(e1 instanceof HoodieWriteConflictException); + throw new RuntimeException(e1); + } + }); + Future future2 = executors.submit(() -> { + String newCommitTime = "005"; + int numRecords = 100; + String commitTimeBetweenPrevAndNew = "002"; + try { + createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); + } catch (Exception e2) { + assertTrue(e2 instanceof HoodieWriteConflictException); + throw new RuntimeException(e2); + } + }); + future1.get(); + future2.get(); } @Test diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index a626c1b5eba13..92e6171b68327 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -257,7 +257,7 @@ public void close() { * Clears the partition Map and reset view states. */ @Override - public final void reset() { + public void reset() { try { writeLock.lock(); clear(); @@ -1135,8 +1135,7 @@ public void sync() { */ protected void runSync(HoodieTimeline oldTimeline, HoodieTimeline newTimeline) { refreshTimeline(newTimeline); - addedPartitions.clear(); - resetViewState(); + clear(); // Initialize with new Hoodie timeline. init(metaClient, newTimeline); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java index 5a190007b4f98..5e2a9fdf87957 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java @@ -253,8 +253,8 @@ public HoodieTimeline getTimeline() { @Override public void sync() { - preferredView.reset(); - secondaryView.reset(); + preferredView.sync(); + secondaryView.sync(); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index c3a3847fd769e..099b79cbba0ab 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -413,6 +413,8 @@ public Stream getAllReplacedFileGroups(String partitionPath) { public boolean refresh() { Map paramsMap = getParams(); try { + // refresh the local timeline first. + this.timeline = metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants(); return executeRequest(REFRESH_TABLE, paramsMap, new TypeReference() {}, RequestMethod.POST); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -450,7 +452,6 @@ public void close() { @Override public void reset() { - timeline = metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants(); refresh(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index f5e14ba1dd34d..759e0f1a3e434 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -139,4 +139,9 @@ public Option getLatestCompactionTime() { public void close() throws Exception { // no-op } + + @Override + public void reset() { + // no-op + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 05d9d7349dbb8..c9e538f72eaa0 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -441,4 +441,10 @@ public Option getLatestCompactionTime() { } return Option.empty(); } + + @Override + public void reset() { + initIfNeeded(); + dataMetaClient.reloadActiveTimeline(); + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java index 453ec8f15ff08..d3b569ceb623a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java @@ -65,6 +65,18 @@ protected FileStatus[] listPartition(Path partitionPath) throws IOException { return tableMetadata.getAllFilesInPartition(partitionPath); } + @Override + public void reset() { + super.reset(); + tableMetadata.reset(); + } + + @Override + public void sync() { + super.sync(); + tableMetadata.reset(); + } + @Override public void close() { try { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index ff9dbae646303..d981b7085195b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -113,4 +113,9 @@ static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetad * Returns the timestamp of the latest compaction. */ Option getLatestCompactionTime(); + + /** + * Clear the states of the table metadata. + */ + void reset(); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java index 02a38e0baeb89..62cc23a99c943 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java @@ -622,8 +622,8 @@ public void testGetTimeline() { @Test public void testSync() { fsView.sync(); - verify(primary, times(1)).reset(); - verify(secondary, times(1)).reset(); + verify(primary, times(1)).sync(); + verify(secondary, times(1)).sync(); } @Test