From 20195712c81096076a3576edd382bcaa19937918 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Sat, 18 Dec 2021 10:51:31 -0800 Subject: [PATCH 1/2] Fixing default lock configs for FileSystemBasedLock --- .../hudi/client/TestHoodieClientMultiWriter.java | 14 ++++++++++++-- .../hudi/common/config/LockConfiguration.java | 4 ++-- .../TestHoodieDeltaStreamerWithMultiWriter.java | 8 ++++---- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index 60979c33521d1..d8fc28c55ed71 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -40,6 +40,7 @@ import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -56,6 +57,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; @@ -90,6 +92,8 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E } Properties properties = new Properties(); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "200"); HoodieWriteConfig cfg = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) @@ -103,6 +107,8 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E ExecutorService executors = Executors.newFixedThreadPool(2); SparkRDDWriteClient client1 = getHoodieWriteClient(cfg); SparkRDDWriteClient client2 = getHoodieWriteClient(cfg); + AtomicBoolean writer1Conflict = new AtomicBoolean(false); + AtomicBoolean writer2Conflict = new AtomicBoolean(false); Future future1 = executors.submit(() -> { String newCommitTime = "004"; int numRecords = 100; @@ -111,7 +117,7 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); } catch (Exception e1) { assertTrue(e1 instanceof HoodieWriteConflictException); - throw new RuntimeException(e1); + writer1Conflict.set(true); } }); Future future2 = executors.submit(() -> { @@ -122,11 +128,15 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords); } catch (Exception e2) { assertTrue(e2 instanceof HoodieWriteConflictException); - throw new RuntimeException(e2); + writer2Conflict.set(true); } }); future1.get(); future2.get(); + Assertions.assertTrue(writer1Conflict.get() || writer2Conflict.get(), "Either of writer1 or writer2 should have failed " + + "with conflict"); + Assertions.assertFalse(writer1Conflict.get() && writer2Conflict.get(), "Both writer1 and writer2 should not result " + + "in conflict"); } @Test diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java index 41d8cd775f3a1..8104d6a37e131 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java @@ -30,14 +30,14 @@ public class LockConfiguration implements Serializable { public static final String LOCK_PREFIX = "hoodie.write.lock."; public static final String LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY = LOCK_PREFIX + "wait_time_ms_between_retry"; - public static final String DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(5000L); + public static final String DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(250L); public static final String LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY = LOCK_PREFIX + "max_wait_time_ms_between_retry"; public static final String LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY = LOCK_PREFIX + "client.wait_time_ms_between_retry"; public static final String LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY = LOCK_PREFIX + "num_retries"; - public static final String DEFAULT_LOCK_ACQUIRE_NUM_RETRIES = String.valueOf(3); + public static final String DEFAULT_LOCK_ACQUIRE_NUM_RETRIES = String.valueOf(10); public static final String LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY = LOCK_PREFIX + "client.num_retries"; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java index 89f4cfc77f0ff..137bd14cb0559 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -92,8 +92,8 @@ void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType ta TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, @@ -132,8 +132,8 @@ void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableTyp TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); - props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); // create new ingestion & backfill job config to generate only INSERTS to avoid conflict From efd48c6cf5eed1d47f06fd71123c167cb0750ce5 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Sat, 18 Dec 2021 13:05:01 -0800 Subject: [PATCH 2/2] Fixing default configs --- .../org/apache/hudi/client/TestHoodieClientMultiWriter.java | 4 +++- .../java/org/apache/hudi/common/config/LockConfiguration.java | 4 ++-- .../functional/TestHoodieDeltaStreamerWithMultiWriter.java | 4 ++++ 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index d8fc28c55ed71..69fc401248cba 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -93,7 +93,9 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E Properties properties = new Properties(); properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "200"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"250"); + properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10"); HoodieWriteConfig cfg = getConfigBuilder() .withCompactionConfig(HoodieCompactionConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build()) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java index 8104d6a37e131..0109f22097a31 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java @@ -30,14 +30,14 @@ public class LockConfiguration implements Serializable { public static final String LOCK_PREFIX = "hoodie.write.lock."; public static final String LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY = LOCK_PREFIX + "wait_time_ms_between_retry"; - public static final String DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(250L); + public static final String DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(1000L); public static final String LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY = LOCK_PREFIX + "max_wait_time_ms_between_retry"; public static final String LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY = LOCK_PREFIX + "client.wait_time_ms_between_retry"; public static final String LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY = LOCK_PREFIX + "num_retries"; - public static final String DEFAULT_LOCK_ACQUIRE_NUM_RETRIES = String.valueOf(10); + public static final String DEFAULT_LOCK_ACQUIRE_NUM_RETRIES = String.valueOf(15); public static final String LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY = LOCK_PREFIX + "client.num_retries"; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java index 137bd14cb0559..dca5f0fdf9293 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -94,6 +94,8 @@ void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType ta props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10"); props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"250"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT, @@ -134,6 +136,8 @@ void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableTyp props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10"); props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"250"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10"); UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); // create new ingestion & backfill job config to generate only INSERTS to avoid conflict