diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index af819bc9c5e70..6a01c4aa235dd 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -115,35 +115,31 @@ private void initKeyGenIfNeeded(boolean populateMetaFields) { } private JavaRDD> clusteringHandleUpdate(JavaRDD> inputRecordsRDD) { - if (config.isClusteringEnabled()) { - Set fileGroupsInPendingClustering = - table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet()); - UpdateStrategy updateStrategy = (UpdateStrategy)ReflectionUtils - .loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering); - Pair>, Set> recordsAndPendingClusteringFileGroups = - (Pair>, Set>)updateStrategy.handleUpdate(inputRecordsRDD); - Set fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight(); - if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) { - return recordsAndPendingClusteringFileGroups.getLeft(); - } - // there are filegroups pending clustering and receiving updates, so rollback the pending clustering instants - // there could be race condition, for example, if the clustering completes after instants are fetched but before rollback completed - if (config.isRollbackPendingClustering()) { - Set pendingClusteringInstantsToRollback = getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream() - .filter(e -> fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey())) - .map(Map.Entry::getValue) - .collect(Collectors.toSet()); - pendingClusteringInstantsToRollback.forEach(instant -> { - String commitTime = HoodieActiveTimeline.createNewInstantTime(); - table.scheduleRollback(context, commitTime, instant, false, config.shouldRollbackUsingMarkers()); - table.rollback(context, commitTime, instant, true, true); - }); - table.getMetaClient().reloadActiveTimeline(); - } + Set fileGroupsInPendingClustering = + table.getFileSystemView().getFileGroupsInPendingClustering().map(entry -> entry.getKey()).collect(Collectors.toSet()); + UpdateStrategy updateStrategy = (UpdateStrategy) ReflectionUtils + .loadClass(config.getClusteringUpdatesStrategyClass(), this.context, fileGroupsInPendingClustering); + Pair>, Set> recordsAndPendingClusteringFileGroups = + (Pair>, Set>) updateStrategy.handleUpdate(inputRecordsRDD); + Set fileGroupsWithUpdatesAndPendingClustering = recordsAndPendingClusteringFileGroups.getRight(); + if (fileGroupsWithUpdatesAndPendingClustering.isEmpty()) { return recordsAndPendingClusteringFileGroups.getLeft(); - } else { - return inputRecordsRDD; } + // there are filegroups pending clustering and receiving updates, so rollback the pending clustering instants + // there could be race condition, for example, if the clustering completes after instants are fetched but before rollback completed + if (config.isRollbackPendingClustering()) { + Set pendingClusteringInstantsToRollback = getAllFileGroupsInPendingClusteringPlans(table.getMetaClient()).entrySet().stream() + .filter(e -> fileGroupsWithUpdatesAndPendingClustering.contains(e.getKey())) + .map(Map.Entry::getValue) + .collect(Collectors.toSet()); + pendingClusteringInstantsToRollback.forEach(instant -> { + String commitTime = HoodieActiveTimeline.createNewInstantTime(); + table.scheduleRollback(context, commitTime, instant, false, config.shouldRollbackUsingMarkers()); + table.rollback(context, commitTime, instant, true, true); + }); + table.getMetaClient().reloadActiveTimeline(); + } + return recordsAndPendingClusteringFileGroups.getLeft(); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index cef6641f1743e..afc4caea2d511 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -18,12 +18,6 @@ package org.apache.hudi.client; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; -import org.apache.hadoop.fs.Path; import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; @@ -42,23 +36,31 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; +import java.util.List; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.stream.Collectors; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class TestHoodieClientMultiWriter extends HoodieClientTestBase { @@ -105,7 +107,7 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E try { createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); } catch (Exception e1) { - Assertions.assertTrue(e1 instanceof HoodieWriteConflictException); + assertTrue(e1 instanceof HoodieWriteConflictException); throw new RuntimeException(e1); } }); @@ -116,13 +118,13 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E try { createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); } catch (Exception e2) { - Assertions.assertTrue(e2 instanceof HoodieWriteConflictException); + assertTrue(e2 instanceof HoodieWriteConflictException); throw new RuntimeException(e2); } }); future1.get(); future2.get(); - Assertions.fail("Should not reach here, this means concurrent writes were handled incorrectly"); + fail("Should not reach here, this means concurrent writes were handled incorrectly"); } catch (Exception e) { // Expected to fail due to overlapping commits } @@ -206,7 +208,7 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); // Disabling embedded timeline server, it doesn't work with multiwriter - HoodieWriteConfig cfg = getConfigBuilder() + HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().withAutoClean(false) .withInlineCompaction(false).withAsyncClean(true) .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) @@ -214,12 +216,12 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t .withEmbeddedTimelineServerEnabled(false) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType( FileSystemViewStorageType.MEMORY).build()) - .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class) - .build()).withAutoCommit(false).withProperties(properties).build(); + .build()).withAutoCommit(false).withProperties(properties); Set validInstants = new HashSet<>(); // Create the first commit with inserts + HoodieWriteConfig cfg = writeConfigBuilder.build(); SparkRDDWriteClient client = getHoodieWriteClient(cfg); createCommitWithInserts(cfg, client, "000", "001", 200); validInstants.add("001"); @@ -229,7 +231,11 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t validInstants.add("002"); validInstants.add("003"); ExecutorService executors = Executors.newFixedThreadPool(2); - SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); + // write config with clustering enabled + HoodieWriteConfig cfg2 = writeConfigBuilder + .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build()) + .build(); + SparkRDDWriteClient client1 = getHoodieWriteClient(cfg2); SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); // Create upserts, schedule cleaning, schedule compaction in parallel Future future1 = executors.submit(() -> { @@ -237,14 +243,14 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t int numRecords = 100; String commitTimeBetweenPrevAndNew = "002"; try { - createCommitWithUpserts(cfg, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); + createCommitWithUpserts(cfg2, client1, "003", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); if (tableType == HoodieTableType.MERGE_ON_READ) { - Assertions.fail("Conflicts not handled correctly"); + fail("Conflicts not handled correctly"); } validInstants.add("004"); } catch (Exception e1) { if (tableType == HoodieTableType.MERGE_ON_READ) { - Assertions.assertTrue(e1 instanceof HoodieWriteConflictException); + assertTrue(e1 instanceof HoodieWriteConflictException); } } }); @@ -272,7 +278,7 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t String newCommitTime = "007"; int numRecords = 100; try { - createCommitWithInserts(cfg, client1, "003", newCommitTime, numRecords); + createCommitWithInserts(cfg2, client1, "003", newCommitTime, numRecords); validInstants.add("007"); } catch (Exception e1) { throw new RuntimeException(e1); @@ -300,10 +306,13 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t future1.get(); future2.get(); future3.get(); - Set completedInstants = metaClient.getActiveTimeline().getCommitsTimeline() + validInstants.addAll( + metaClient.reloadActiveTimeline().getCompletedReplaceTimeline() + .filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet())); + Set completedInstants = metaClient.reloadActiveTimeline().getCommitsTimeline() .filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toSet()); - Assertions.assertTrue(validInstants.containsAll(completedInstants)); + assertTrue(validInstants.containsAll(completedInstants)); } @ParameterizedTest @@ -314,13 +323,16 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType) } Properties properties = new Properties(); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - HoodieWriteConfig cfg = getConfigBuilder() + HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) .withAutoClean(false).build()) - .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClusteringNumCommits(1).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class) - .build()).withAutoCommit(false).withProperties(properties).build(); + .build()).withAutoCommit(false).withProperties(properties); + HoodieWriteConfig cfg = writeConfigBuilder.build(); + HoodieWriteConfig cfg2 = writeConfigBuilder + .withClusteringConfig(HoodieClusteringConfig.newBuilder().withInlineClustering(true).withInlineClusteringNumCommits(1).build()) + .build(); // Create the first commit createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200); // Start another inflight commit @@ -333,8 +345,8 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType) numRecords, 200, 2); // Start and finish another commit while the previous writer for commit 003 is running newCommitTime = "004"; - SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); - JavaRDD result2 = updateBatch(cfg, client2, newCommitTime, "001", + SparkRDDWriteClient client2 = getHoodieWriteClient(cfg2); + JavaRDD result2 = updateBatch(cfg2, client2, newCommitTime, "001", Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), "000", numRecords, SparkRDDWriteClient::upsert, false, false, numRecords, 200, 2); client2.commit(newCommitTime, result2); @@ -342,11 +354,11 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType) SparkRDDWriteClient client3 = getHoodieWriteClient(cfg); // schedule clustering Option clusterInstant = client3.scheduleTableService(Option.empty(), TableServiceType.CLUSTER); - client3.cluster(clusterInstant.get(), true); + assertFalse(clusterInstant.isPresent()); // Attempt to commit the inflight commit 003 try { client1.commit("003", result1); - Assertions.fail("Should have thrown a concurrent conflict exception"); + fail("Should have thrown a concurrent conflict exception"); } catch (Exception e) { // Expected } @@ -376,5 +388,4 @@ private void createCommitWithUpserts(HoodieWriteConfig cfg, SparkRDDWriteClient numRecords, 200, 2); client.commit(newCommitTime, result); } - } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 7e3f8f3292577..30d59baece096 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -1418,8 +1418,7 @@ public void testPendingClusteringRollback(boolean populateMetaFields) throws Exc dataGen = new HoodieTestDataGenerator(); String commitTime = HoodieActiveTimeline.createNewInstantTime(); allRecords.addAll(dataGen.generateInserts(commitTime, 200)); - writeAndVerifyBatch(client, allRecords, commitTime, populateMetaFields); - + assertThrows(HoodieUpsertException.class, () -> writeAndVerifyBatch(client, allRecords, commitTime, populateMetaFields)); // verify pending clustering can be rolled back (even though there is a completed commit greater than pending clustering) client.rollback(pendingClusteringInstant.getTimestamp()); metaClient.reloadActiveTimeline();