From 9aa155e285138dc360e20316368f6736bc5ee62e Mon Sep 17 00:00:00 2001 From: Manoj Govindassamy Date: Sun, 19 Dec 2021 01:51:22 -0800 Subject: [PATCH 1/2] [HUDI-3064][HUDI-3054] FileSystemBasedLockProviderTestClass tryLock fix and TestHoodieClientMultiWriter test fixes - Made FileSystemBasedLockProviderTestClass thread safe and fixed the tryLock retry logic. - Made TestHoodieClientMultiWriter. testHoodieClientBasicMultiWriter deterministic in verifying the HoodieWriteConflictException. --- .../FileSystemBasedLockProviderTestClass.java | 77 ++++++------ .../client/TestHoodieClientMultiWriter.java | 116 +++++++++++++----- 2 files changed, 128 insertions(+), 65 deletions(-) diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java index 2fc6ba4aa0853..97ad050e7240e 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/FileSystemBasedLockProviderTestClass.java @@ -36,38 +36,37 @@ import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY; /** - * This lock provider is used for testing purposes only. It provides a simple file system based lock using HDFS atomic - * create operation. This lock does not support cleaning/expiring the lock after a failed write hence cannot be used - * in production environments. + * This lock provider is used for testing purposes only. It provides a simple file system based lock + * using filesystem's atomic create operation. This lock does not support cleaning/expiring the lock + * after a failed write. Must not be used in production environments. */ public class FileSystemBasedLockProviderTestClass implements LockProvider, Serializable { - private static final String LOCK_NAME = "acquired"; + private static final String LOCK = "lock"; - private String lockPath; + private final int retryMaxCount; + private final int retryWaitTimeMs; private transient FileSystem fs; + private transient Path lockFile; protected LockConfiguration lockConfiguration; public FileSystemBasedLockProviderTestClass(final LockConfiguration lockConfiguration, final Configuration configuration) { this.lockConfiguration = lockConfiguration; - this.lockPath = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY); - this.fs = FSUtils.getFs(this.lockPath, configuration); - } - - public void acquireLock() { - try { - fs.create(new Path(lockPath + "/" + LOCK_NAME), false).close(); - } catch (IOException e) { - throw new HoodieIOException("Failed to acquire lock", e); - } + final String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY); + this.retryWaitTimeMs = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY); + this.retryMaxCount = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY); + this.lockFile = new Path(lockDirectory + "/" + LOCK); + this.fs = FSUtils.getFs(this.lockFile.toString(), configuration); } @Override public void close() { - try { - fs.delete(new Path(lockPath + "/" + LOCK_NAME), true); - } catch (IOException e) { - throw new HoodieLockException("Unable to release lock", e); + synchronized (LOCK) { + try { + fs.delete(this.lockFile, true); + } catch (IOException e) { + throw new HoodieLockException("Unable to release lock: " + getLock(), e); + } } } @@ -75,39 +74,45 @@ public void close() { public boolean tryLock(long time, TimeUnit unit) { try { int numRetries = 0; - while (fs.exists(new Path(lockPath + "/" + LOCK_NAME)) - && (numRetries++ <= lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY))) { - Thread.sleep(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY)); - } - synchronized (LOCK_NAME) { - if (fs.exists(new Path(lockPath + "/" + LOCK_NAME))) { - return false; + synchronized (LOCK) { + while (fs.exists(this.lockFile)) { + LOCK.wait(retryWaitTimeMs); + numRetries++; + if (numRetries > retryMaxCount) { + return false; + } } acquireLock(); + return fs.exists(this.lockFile); } - return true; } catch (IOException | InterruptedException e) { - throw new HoodieLockException("Failed to acquire lock", e); + throw new HoodieLockException("Failed to acquire lock: " + getLock(), e); } } @Override public void unlock() { - try { - if (fs.exists(new Path(lockPath + "/" + LOCK_NAME))) { - fs.delete(new Path(lockPath + "/" + LOCK_NAME), true); + synchronized (LOCK) { + try { + if (fs.exists(this.lockFile)) { + fs.delete(this.lockFile, true); + } + } catch (IOException io) { + throw new HoodieIOException("Unable to delete lock " + getLock() + "on disk", io); } - } catch (IOException io) { - throw new HoodieIOException("Unable to delete lock on disk", io); } } @Override public String getLock() { + return this.lockFile.toString(); + } + + private void acquireLock() { try { - return fs.listStatus(new Path(lockPath))[0].getPath().toString(); - } catch (Exception e) { - throw new HoodieLockException("Failed to retrieve lock status from lock path " + lockPath); + fs.create(this.lockFile, false).close(); + } catch (IOException e) { + throw new HoodieIOException("Failed to acquire lock: " + getLock(), e); } } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index 69fc401248cba..1fc1c58ad50fd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.model.WriteConcurrencyMode; @@ -41,6 +42,7 @@ import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -53,6 +55,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -61,6 +64,7 @@ import java.util.stream.Collectors; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; +import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -84,9 +88,11 @@ public void clean() throws IOException { cleanupResources(); } - @ParameterizedTest - @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) - public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws Exception { + //@ParameterizedTest + //@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) + @RepeatedTest(20) + public void testHoodieClientBasicMultiWriter() throws Exception { + HoodieTableType tableType = HoodieTableType.MERGE_ON_READ; if (tableType == HoodieTableType.MERGE_ON_READ) { setUpMORTestTable(); } @@ -96,7 +102,7 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"250"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10"); - HoodieWriteConfig cfg = getConfigBuilder() + HoodieWriteConfig writeConfig = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) @@ -104,41 +110,64 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E .withMarkersType(MarkerType.DIRECT.name()) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class) .build()).withAutoCommit(false).withProperties(properties).build(); + // Create the first commit - createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200); - ExecutorService executors = Executors.newFixedThreadPool(2); - SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); - SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); - AtomicBoolean writer1Conflict = new AtomicBoolean(false); - AtomicBoolean writer2Conflict = new AtomicBoolean(false); + createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200); + + final int threadCount = 2; + final ExecutorService executors = Executors.newFixedThreadPool(2); + final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig); + final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig); + + final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount); + final AtomicBoolean writer1Completed = new AtomicBoolean(false); + final AtomicBoolean writer2Completed = new AtomicBoolean(false); + Future future1 = executors.submit(() -> { - String newCommitTime = "004"; - int numRecords = 100; - String commitTimeBetweenPrevAndNew = "002"; try { - createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); - } catch (Exception e1) { - assertTrue(e1 instanceof HoodieWriteConflictException); - writer1Conflict.set(true); + final String nextCommitTime = "002"; + final JavaRDD writeStatusList = startCommitForUpdate(writeConfig, client1, nextCommitTime, 100); + + // Wait for the 2nd writer to start the commit + cyclicBarrier.await(); + + // Commit the update before the 2nd writer + assertDoesNotThrow(() -> { + client1.commit(nextCommitTime, writeStatusList); + }); + + // Signal the 2nd writer to go ahead for his commit + cyclicBarrier.await(); + writer1Completed.set(true); + } catch (Exception e) { + writer1Completed.set(false); } }); + Future future2 = executors.submit(() -> { - String newCommitTime = "005"; - int numRecords = 100; - String commitTimeBetweenPrevAndNew = "002"; try { - createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); - } catch (Exception e2) { - assertTrue(e2 instanceof HoodieWriteConflictException); - writer2Conflict.set(true); + final String nextCommitTime = "003"; + + // Wait for the 1st writer to start the commit + cyclicBarrier.await(); + final JavaRDD writeStatusList = startCommitForUpdate(writeConfig, client2, nextCommitTime, 100); + + // Wait for the 1st writer to complete the commit + cyclicBarrier.await(); + assertThrows(HoodieWriteConflictException.class, () -> { + client2.commit(nextCommitTime, writeStatusList); + }); + writer2Completed.set(true); + } catch (Exception e) { + writer2Completed.set(false); } }); + future1.get(); future2.get(); - Assertions.assertTrue(writer1Conflict.get() || writer2Conflict.get(), "Either of writer1 or writer2 should have failed " - + "with conflict"); - Assertions.assertFalse(writer1Conflict.get() && writer2Conflict.get(), "Both writer1 and writer2 should not result " - + "in conflict"); + + assertTrue(writer1Completed.get()); + assertTrue(writer2Completed.get()); } @Test @@ -443,4 +472,33 @@ private void createCommitWithUpserts(HoodieWriteConfig cfg, SparkRDDWriteClient numRecords, 200, 2); client.commit(newCommitTime, result); } -} + + /** + * Start the commit for an update operation with given number of records + * + * @param writeConfig - Write config + * @param writeClient - Write client for starting the commit + * @param newCommitTime - Commit time for the update + * @param numRecords - Number of records to update + * @return RDD of write status from the update + * @throws Exception + */ + private JavaRDD startCommitForUpdate(HoodieWriteConfig writeConfig, SparkRDDWriteClient writeClient, + String newCommitTime, int numRecords) throws Exception { + // Start the new commit + writeClient.startCommitWithTime(newCommitTime); + + // Prepare update records + final Function2, String, Integer> recordGenFunction = + generateWrapRecordsFn(false, writeConfig, dataGen::generateUniqueUpdates); + final List records = recordGenFunction.apply(newCommitTime, numRecords); + final JavaRDD writeRecords = jsc.parallelize(records, 1); + + // Write updates + Function3, SparkRDDWriteClient, JavaRDD, String> writeFn = SparkRDDWriteClient::upsert; + JavaRDD result = writeFn.apply(writeClient, writeRecords, newCommitTime); + List statuses = result.collect(); + assertNoWriteErrors(statuses); + return result; + } +} \ No newline at end of file From 70aed6e48f81f0b8a4496121cf353f709d654422 Mon Sep 17 00:00:00 2001 From: Manoj Govindassamy Date: Sun, 19 Dec 2021 08:36:10 -0800 Subject: [PATCH 2/2] [HUDI-3064][HUDI-3054] FileSystemBasedLockProviderTestClass tryLock fix and TestHoodieClientMultiWriter test fixes - Adding timeout for the cyclic barrier await to avoid long test timeouts --- .../client/TestHoodieClientMultiWriter.java | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index 1fc1c58ad50fd..5f8b26bac9530 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -41,8 +41,6 @@ import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -88,11 +86,9 @@ public void clean() throws IOException { cleanupResources(); } - //@ParameterizedTest - //@EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) - @RepeatedTest(20) - public void testHoodieClientBasicMultiWriter() throws Exception { - HoodieTableType tableType = HoodieTableType.MERGE_ON_READ; + @ParameterizedTest + @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) + public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws Exception { if (tableType == HoodieTableType.MERGE_ON_READ) { setUpMORTestTable(); } @@ -129,7 +125,7 @@ public void testHoodieClientBasicMultiWriter() throws Exception { final JavaRDD writeStatusList = startCommitForUpdate(writeConfig, client1, nextCommitTime, 100); // Wait for the 2nd writer to start the commit - cyclicBarrier.await(); + cyclicBarrier.await(60, TimeUnit.SECONDS); // Commit the update before the 2nd writer assertDoesNotThrow(() -> { @@ -137,7 +133,7 @@ public void testHoodieClientBasicMultiWriter() throws Exception { }); // Signal the 2nd writer to go ahead for his commit - cyclicBarrier.await(); + cyclicBarrier.await(60, TimeUnit.SECONDS); writer1Completed.set(true); } catch (Exception e) { writer1Completed.set(false); @@ -148,12 +144,12 @@ public void testHoodieClientBasicMultiWriter() throws Exception { try { final String nextCommitTime = "003"; - // Wait for the 1st writer to start the commit - cyclicBarrier.await(); + // Wait for the 1st writer to make progress with the commit + cyclicBarrier.await(60, TimeUnit.SECONDS); final JavaRDD writeStatusList = startCommitForUpdate(writeConfig, client2, nextCommitTime, 100); // Wait for the 1st writer to complete the commit - cyclicBarrier.await(); + cyclicBarrier.await(60, TimeUnit.SECONDS); assertThrows(HoodieWriteConflictException.class, () -> { client2.commit(nextCommitTime, writeStatusList); }); @@ -166,8 +162,8 @@ public void testHoodieClientBasicMultiWriter() throws Exception { future1.get(); future2.get(); - assertTrue(writer1Completed.get()); - assertTrue(writer2Completed.get()); + // both should have been completed successfully. I mean, we already assert for conflict for writer2 at L155. + assertTrue(writer1Completed.get() && writer2Completed.get()); } @Test @@ -501,4 +497,4 @@ private JavaRDD startCommitForUpdate(HoodieWriteConfig writeConfig, assertNoWriteErrors(statuses); return result; } -} \ No newline at end of file +}