diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java new file mode 100644 index 0000000000000..96a42e8409b29 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/FileSystemBasedLockProvider.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.transaction.lock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.lock.LockProvider; +import org.apache.hudi.common.lock.LockState; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieLockException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; + +/** + * A FileSystem based lock. This {@link LockProvider} implementation allows to lock table operations + * using DFS. Users might need to manually clean the Locker's path if writeClient crash and never run again. + * NOTE: This only works for DFS with atomic create/delete operation + */ +public class FileSystemBasedLockProvider implements LockProvider, Serializable { + + private static final Logger LOG = LogManager.getLogger(FileSystemBasedLockProvider.class); + + private static final String LOCK_FILE_NAME = "lock"; + + private final int lockTimeoutMinutes; + private transient FileSystem fs; + private transient Path lockFile; + protected LockConfiguration lockConfiguration; + + public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, final Configuration configuration) { + checkRequiredProps(lockConfiguration); + this.lockConfiguration = lockConfiguration; + String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null); + if (StringUtils.isNullOrEmpty(lockDirectory)) { + lockDirectory = lockConfiguration.getConfig().getString(HoodieWriteConfig.BASE_PATH.key()) + + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME; + } + this.lockTimeoutMinutes = lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY); + this.lockFile = new Path(lockDirectory + Path.SEPARATOR + LOCK_FILE_NAME); + this.fs = FSUtils.getFs(this.lockFile.toString(), configuration); + } + + @Override + public void close() { + synchronized (LOCK_FILE_NAME) { + try { + fs.delete(this.lockFile, true); + } catch (IOException e) { + throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE), e); + } + } + } + + @Override + public boolean tryLock(long time, TimeUnit unit) { + try { + synchronized (LOCK_FILE_NAME) { + // Check whether lock is already expired, if so try to delete lock file + if (fs.exists(this.lockFile) && checkIfExpired()) { + fs.delete(this.lockFile, true); + } + acquireLock(); + return fs.exists(this.lockFile); + } + } catch (IOException | HoodieIOException e) { + LOG.info(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e); + return false; + } + } + + @Override + public void unlock() { + synchronized (LOCK_FILE_NAME) { + try { + if (fs.exists(this.lockFile)) { + fs.delete(this.lockFile, true); + } + } catch (IOException io) { + throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_RELEASE), io); + } + } + } + + @Override + public String getLock() { + return this.lockFile.toString(); + } + + private boolean checkIfExpired() { + if (lockTimeoutMinutes == 0) { + return false; + } + try { + long modificationTime = fs.getFileStatus(this.lockFile).getModificationTime(); + if (System.currentTimeMillis() - modificationTime > lockTimeoutMinutes * 60 * 1000) { + return true; + } + } catch (IOException | HoodieIOException e) { + LOG.error(generateLogStatement(LockState.ALREADY_RELEASED) + " failed to get lockFile's modification time", e); + } + return false; + } + + private void acquireLock() { + try { + fs.create(this.lockFile, false).close(); + } catch (IOException e) { + throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e); + } + } + + protected String generateLogStatement(LockState state) { + return StringUtils.join(state.name(), " lock at: ", getLock()); + } + + private void checkRequiredProps(final LockConfiguration config) { + ValidationUtils.checkArgument(config.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY, null) != null + || config.getConfig().getString(HoodieWriteConfig.BASE_PATH.key(), null) != null); + ValidationUtils.checkArgument(config.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY) >= 0); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java index 9ea28fbbd42e7..7fcc96810be2c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java @@ -36,6 +36,7 @@ import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS; import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_CONNECTION_TIMEOUT_MS; import static org.apache.hudi.common.config.LockConfiguration.DEFAULT_ZK_SESSION_TIMEOUT_MS; +import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.HIVE_DATABASE_NAME_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.HIVE_METASTORE_URI_PROP_KEY; @@ -106,7 +107,13 @@ public class HoodieLockConfig extends HoodieConfig { .key(FILESYSTEM_LOCK_PATH_PROP_KEY) .noDefaultValue() .sinceVersion("0.8.0") - .withDocumentation("For DFS based lock providers, path to store the locks under."); + .withDocumentation("For DFS based lock providers, path to store the locks under. use Table's meta path as default"); + + public static final ConfigProperty FILESYSTEM_LOCK_EXPIRE = ConfigProperty + .key(FILESYSTEM_LOCK_EXPIRE_PROP_KEY) + .defaultValue(0) + .sinceVersion("0.12.0") + .withDocumentation("For DFS based lock providers, expire time in minutes, must be a nonnegative number, default means no expire"); public static final ConfigProperty HIVE_DATABASE_NAME = ConfigProperty .key(HIVE_DATABASE_NAME_PROP_KEY) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java new file mode 100644 index 0000000000000..208e9cd62e738 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestFileBasedLockProvider.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; +import org.apache.hudi.common.config.LockConfiguration; +import org.apache.hudi.common.testutils.minicluster.HdfsTestService; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieLockException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY; + +public class TestFileBasedLockProvider { + private static HdfsTestService hdfsTestService; + private static MiniDFSCluster dfsCluster; + private static LockConfiguration lockConfiguration; + private static Configuration hadoopConf; + + @BeforeAll + public static void setup() throws IOException { + hdfsTestService = new HdfsTestService(); + dfsCluster = hdfsTestService.start(true); + hadoopConf = dfsCluster.getFileSystem().getConf(); + + Properties properties = new Properties(); + properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/"); + properties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1"); + properties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000"); + properties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000"); + properties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3"); + lockConfiguration = new LockConfiguration(properties); + } + + @AfterAll + public static void cleanUpAfterAll() throws IOException { + Path workDir = dfsCluster.getFileSystem().getWorkingDirectory(); + FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf()); + fs.delete(new Path("/tmp"), true); + if (hdfsTestService != null) { + hdfsTestService.stop(); + hdfsTestService = null; + } + } + + @AfterEach + public void cleanUpAfterEach() throws IOException { + Path workDir = dfsCluster.getFileSystem().getWorkingDirectory(); + FileSystem fs = workDir.getFileSystem(hdfsTestService.getHadoopConf()); + fs.delete(new Path("/tmp/lock"), true); + } + + @Test + public void testAcquireLock() { + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); + Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + fileBasedLockProvider.unlock(); + } + + @Test + public void testAcquireLockWithDefaultPath() { + lockConfiguration.getConfig().remove(FILESYSTEM_LOCK_PATH_PROP_KEY); + lockConfiguration.getConfig().setProperty(HoodieWriteConfig.BASE_PATH.key(), "/tmp/"); + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); + Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + fileBasedLockProvider.unlock(); + lockConfiguration.getConfig().setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, "/tmp/"); + } + + @Test + public void testUnLock() { + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); + Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + fileBasedLockProvider.unlock(); + Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + } + + @Test + public void testReentrantLock() { + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); + Assertions.assertTrue(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + Assertions.assertFalse(fileBasedLockProvider.tryLock(lockConfiguration.getConfig() + .getLong(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY), TimeUnit.MILLISECONDS)); + fileBasedLockProvider.unlock(); + } + + @Test + public void testUnlockWithoutLock() { + try { + FileSystemBasedLockProvider fileBasedLockProvider = new FileSystemBasedLockProvider(lockConfiguration, hadoopConf); + fileBasedLockProvider.unlock(); + } catch (HoodieLockException e) { + Assertions.fail(); + } + } + +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index 268674e78d87a..6ad8666a0fa20 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -18,6 +18,7 @@ package org.apache.hudi.client; +import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; import org.apache.hudi.client.transaction.lock.InProcessLockProvider; import org.apache.hudi.common.config.LockConfiguration; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; @@ -45,9 +46,11 @@ import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import java.io.IOException; import java.util.ArrayList; @@ -66,7 +69,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_EXPIRE_PROP_KEY; import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY; +import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -76,6 +83,21 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase { + private Properties lockProperties = null; + + @BeforeEach + public void setup() throws IOException { + if (lockProperties == null) { + lockProperties = new Properties(); + lockProperties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); + lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); + lockProperties.setProperty(FILESYSTEM_LOCK_EXPIRE_PROP_KEY, "1"); + lockProperties.setProperty(LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "1000"); + lockProperties.setProperty(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "1000"); + lockProperties.setProperty(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY, "3"); + } + } + public void setUpMORTestTable() throws IOException { cleanupResources(); initPath(); @@ -92,15 +114,27 @@ public void clean() throws IOException { cleanupResources(); } + private static final List LOCK_PROVIDER_CLASSES = Arrays.asList( + InProcessLockProvider.class, + FileSystemBasedLockProvider.class); + + private static Iterable providerClassAndTableType() { + List opts = new ArrayList<>(); + for (Object providerClass : LOCK_PROVIDER_CLASSES) { + opts.add(new Object[] {HoodieTableType.COPY_ON_WRITE, providerClass}); + opts.add(new Object[] {HoodieTableType.MERGE_ON_READ, providerClass}); + } + return opts; + } + @ParameterizedTest - @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) - public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws Exception { + @MethodSource("providerClassAndTableType") + public void testHoodieClientBasicMultiWriter(HoodieTableType tableType, Class providerClass) throws Exception { if (tableType == HoodieTableType.MERGE_ON_READ) { setUpMORTestTable(); } - Properties properties = new Properties(); - properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); + lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); + HoodieWriteConfig writeConfig = getConfigBuilder() .withCleanConfig(HoodieCleanConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) @@ -110,8 +144,8 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) // Timeline-server-based markers are not used for multi-writer tests .withMarkersType(MarkerType.DIRECT.name()) - .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) - .build()).withAutoCommit(false).withProperties(properties).build(); + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass) + .build()).withAutoCommit(false).withProperties(lockProperties).build(); // Create the first commit createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200, true); @@ -172,16 +206,6 @@ public void testHoodieClientBasicMultiWriter(HoodieTableType tableType) throws E assertTrue(writer1Completed.get() && writer2Completed.get()); } - @Test - public void testMultiWriterWithAsyncTableServicesWithConflictCOW() throws Exception { - testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.COPY_ON_WRITE); - } - - @Test - public void testMultiWriterWithAsyncTableServicesWithConflictMOR() throws Exception { - testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType.MERGE_ON_READ); - } - @ParameterizedTest @EnumSource(value = HoodieTableType.class, names = {"COPY_ON_WRITE", "MERGE_ON_READ"}) public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType tableType) throws Exception { @@ -189,11 +213,9 @@ public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType table setUpMORTestTable(); } - Properties properties = new Properties(); - properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20"); + lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); + lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "3000"); + lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "20"); HoodieWriteConfig cfg = getConfigBuilder() .withCleanConfig(HoodieCleanConfig.newBuilder() @@ -210,7 +232,7 @@ public void testMultiWriterWithInsertsToDistinctPartitions(HoodieTableType table .withAutoCommit(false) // Timeline-server-based markers are not used for multi-writer tests .withMarkersType(MarkerType.DIRECT.name()) - .withProperties(properties) + .withProperties(lockProperties) .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.REMOTE_FIRST) .withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build()) .build(); @@ -260,15 +282,13 @@ private void latchCountDownAndWait(CountDownLatch latch, long waitTimeMillis) { } } - private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType) throws Exception { + @ParameterizedTest + @MethodSource("providerClassAndTableType") + public void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType tableType, Class providerClass) throws Exception { // create inserts X 1 if (tableType == HoodieTableType.MERGE_ON_READ) { setUpMORTestTable(); } - Properties properties = new Properties(); - properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); // Disabling embedded timeline server, it doesn't work with multiwriter HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() .withCleanConfig(HoodieCleanConfig.newBuilder() @@ -284,8 +304,8 @@ private void testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType t .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withStorageType( FileSystemViewStorageType.MEMORY).build()) .withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL) - .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) - .build()).withAutoCommit(false).withProperties(properties); + .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(providerClass) + .build()).withAutoCommit(false).withProperties(lockProperties); Set validInstants = new HashSet<>(); // Create the first commit with inserts HoodieWriteConfig cfg = writeConfigBuilder.build(); @@ -458,10 +478,7 @@ public void testHoodieClientMultiWriterWithClustering(HoodieTableType tableType) @Test public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception { - Properties properties = new Properties(); - properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY, "3000"); - properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100"); + lockProperties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "100"); HoodieWriteConfig.Builder writeConfigBuilder = getConfigBuilder() .withCleanConfig(HoodieCleanConfig.newBuilder() .withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY) @@ -470,7 +487,7 @@ public void testHoodieClientMultiWriterAutoCommitForConflict() throws Exception // Timeline-server-based markers are not used for multi-writer tests .withMarkersType(MarkerType.DIRECT.name()) .withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class) - .build()).withAutoCommit(true).withProperties(properties); + .build()).withAutoCommit(true).withProperties(lockProperties); HoodieWriteConfig cfg = writeConfigBuilder.build(); HoodieWriteConfig cfg2 = writeConfigBuilder.build(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java index 0109f22097a31..c6ebc54e95d78 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/LockConfiguration.java @@ -48,6 +48,8 @@ public class LockConfiguration implements Serializable { public static final String FILESYSTEM_LOCK_PATH_PROP_KEY = FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "path"; + public static final String FILESYSTEM_LOCK_EXPIRE_PROP_KEY = FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX + "expire"; + // configs for metastore based locks public static final String HIVE_METASTORE_LOCK_PROPERTY_PREFIX = LOCK_PREFIX + "hivemetastore.";